Logging¶
Structured logging for observability and debugging.
Structured Logging with structlog¶
Setup¶
import structlog
import logging
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.stdlib.add_logger_name,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.UnicodeDecoder(),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
Basic Usage¶
log = structlog.get_logger()
# Simple message
log.info("application_started")
# With context
log.info("user_login", user_id=123, ip="192.168.1.1")
# Different levels
log.debug("cache_hit", key="user:123")
log.warning("rate_limit_approaching", current=95, limit=100)
log.error("payment_failed", order_id=456, error="Card declined")
Output:
{"event": "user_login", "user_id": 123, "ip": "192.168.1.1", "level": "info", "timestamp": "2024-01-15T10:30:00Z"}
Request Context¶
FastAPI Middleware¶
import structlog
from contextvars import ContextVar
from fastapi import Request
import uuid
request_id: ContextVar[str] = ContextVar("request_id", default="")
@app.middleware("http")
async def logging_middleware(request: Request, call_next):
# Generate or extract request ID
req_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
request_id.set(req_id)
# Bind context for all logs in this request
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=req_id,
path=request.url.path,
method=request.method,
)
log = structlog.get_logger()
log.info("request_started")
response = await call_next(request)
log.info("request_completed", status_code=response.status_code)
response.headers["X-Request-ID"] = req_id
return response
Adding User Context¶
from fastapi import Depends
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = await verify_token(token)
# Add user to log context
structlog.contextvars.bind_contextvars(
user_id=user.id,
user_email=user.email,
)
return user
# All subsequent logs will include user_id and user_email
Log Levels¶
| Level | When to Use |
|---|---|
| DEBUG | Detailed diagnostic info (not in production) |
| INFO | Normal operation, business events |
| WARNING | Unexpected but handled situations |
| ERROR | Errors that affect functionality |
| CRITICAL | System-wide failures |
Examples¶
# DEBUG: Verbose diagnostic info
log.debug("cache_lookup", key="user:123", hit=True)
# INFO: Business events, state changes
log.info("order_created", order_id=456, total=99.99)
log.info("user_registered", user_id=789)
# WARNING: Degraded but functional
log.warning("rate_limit_approaching", current=95, limit=100)
log.warning("retry_attempt", attempt=2, max_attempts=3)
# ERROR: Failed operations
log.error("payment_failed", order_id=456, error="Card declined")
log.error("external_api_error", service="stripe", status=500)
# CRITICAL: System failures
log.critical("database_connection_lost")
log.critical("out_of_disk_space", available_mb=10)
Error Logging¶
Exception Logging¶
try:
result = risky_operation()
except Exception as e:
log.exception("operation_failed", error=str(e))
# Includes full stack trace
# Or with exc_info
log.error("operation_failed", exc_info=True)
Error Context¶
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
log = structlog.get_logger()
log.error(
"unhandled_exception",
error_type=type(exc).__name__,
error_message=str(exc),
path=request.url.path,
method=request.method,
exc_info=True,
)
return JSONResponse(
status_code=500,
content={"detail": "Internal server error"}
)
Log Sanitization¶
Redacting Sensitive Data¶
SENSITIVE_KEYS = {"password", "token", "secret", "api_key", "authorization"}
def sanitize_log_data(event_dict):
for key in event_dict:
if any(sensitive in key.lower() for sensitive in SENSITIVE_KEYS):
event_dict[key] = "[REDACTED]"
return event_dict
structlog.configure(
processors=[
# ... other processors
sanitize_log_data,
structlog.processors.JSONRenderer()
]
)
PII Handling¶
def mask_email(email: str) -> str:
local, domain = email.split("@")
return f"{local[:2]}***@{domain}"
def mask_ip(ip: str) -> str:
parts = ip.split(".")
return f"{parts[0]}.{parts[1]}.*.*"
# Usage
log.info("user_action",
email=mask_email(user.email), # "jo***@example.com"
ip=mask_ip(request.client.host) # "192.168.*.*"
)
Performance Logging¶
Timing Operations¶
import time
from contextlib import contextmanager
@contextmanager
def log_timing(operation: str, **context):
log = structlog.get_logger()
start = time.perf_counter()
try:
yield
finally:
duration_ms = (time.perf_counter() - start) * 1000
log.info(f"{operation}_completed", duration_ms=round(duration_ms, 2), **context)
# Usage
with log_timing("database_query", table="users"):
users = await db.execute(select(User))
Slow Operation Warnings¶
@contextmanager
def log_slow_operation(operation: str, threshold_ms: float = 100):
log = structlog.get_logger()
start = time.perf_counter()
try:
yield
finally:
duration_ms = (time.perf_counter() - start) * 1000
if duration_ms > threshold_ms:
log.warning(f"{operation}_slow",
duration_ms=round(duration_ms, 2),
threshold_ms=threshold_ms
)
Configuration by Environment¶
import os
def configure_logging():
log_level = os.getenv("LOG_LEVEL", "INFO")
log_format = os.getenv("LOG_FORMAT", "json")
processors = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
]
if log_format == "json":
processors.append(structlog.processors.JSONRenderer())
else:
# Human-readable for development
processors.append(structlog.dev.ConsoleRenderer())
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
)
Log Aggregation¶
Sending to External Services¶
import httpx
class LogShipper:
def __init__(self, endpoint: str, api_key: str):
self.endpoint = endpoint
self.api_key = api_key
self.buffer = []
self.buffer_size = 100
async def ship(self, log_entry: dict):
self.buffer.append(log_entry)
if len(self.buffer) >= self.buffer_size:
await self.flush()
async def flush(self):
if not self.buffer:
return
async with httpx.AsyncClient() as client:
await client.post(
self.endpoint,
json={"logs": self.buffer},
headers={"Authorization": f"Bearer {self.api_key}"}
)
self.buffer = []
Best Practices¶
- Use structured logging — JSON over plain text
- Include context — Request ID, user ID, relevant IDs
- Use appropriate levels — Don't overuse ERROR
- Sanitize sensitive data — Passwords, tokens, PII
- Log business events — Not just errors
- Include timing — Duration for slow operations
- Keep logs actionable — What happened, why it matters
See Also¶
- Error Handling -- Exception hierarchy and structlog integration for error logging
- Tracing -- Distributed tracing for correlating logs across services