Skip to content

Monitoring & Observability

Track application health, performance, and behavior in production.

The Three Pillars

Pillar Purpose Tools
Logs What happened structlog, ELK
Metrics How much/how often Prometheus, Grafana
Traces Request journey OpenTelemetry, Jaeger
Request → [Trace ID: abc123]
    ├─ Log: "User login attempt" {trace_id: abc123}
    ├─ Metric: http_requests_total{endpoint="/login"} +1
    └─ Span: authenticate_user (45ms)
           └─ Span: db_query (12ms)

Section Contents

Topic Description
Logging Structured logging with correlation
Metrics Prometheus metrics and dashboards
Tracing Distributed tracing with OpenTelemetry
Alerting Alert design and on-call practices

Quick Setup

Structured Logging

import structlog

structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()
    ]
)

log = structlog.get_logger()
log.info("user_login", user_id=123, ip="192.168.1.1")

Basic Metrics

from prometheus_client import Counter, Histogram

REQUEST_COUNT = Counter('http_requests_total', 'Total requests', ['method', 'endpoint', 'status'])
REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'Request latency', ['endpoint'])

@app.middleware("http")
async def metrics_middleware(request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = time.time() - start

    REQUEST_COUNT.labels(
        method=request.method,
        endpoint=request.url.path,
        status=response.status_code
    ).inc()

    REQUEST_LATENCY.labels(endpoint=request.url.path).observe(duration)

    return response

OpenTelemetry

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter

trace.set_tracer_provider(TracerProvider())
tracer = trace.get_tracer(__name__)

trace.get_tracer_provider().add_span_processor(
    BatchSpanProcessor(OTLPSpanExporter())
)

@tracer.start_as_current_span("process_order")
def process_order(order_id: int):
    span = trace.get_current_span()
    span.set_attribute("order.id", order_id)
    # Processing logic...

Key Metrics to Monitor

Application

Metric Type Description
http_requests_total Counter Total requests by endpoint/status
http_request_duration_seconds Histogram Request latency
http_requests_in_progress Gauge Current active requests
errors_total Counter Errors by type

Database

Metric Type Description
db_query_duration_seconds Histogram Query latency
db_connections_active Gauge Active connections
db_connections_idle Gauge Idle connections

Infrastructure

Metric Type Description
cpu_usage_percent Gauge CPU utilization
memory_usage_bytes Gauge Memory usage
disk_usage_percent Gauge Disk utilization

Dashboard Essentials

RED Method (Request-focused)

  • Rate — Requests per second
  • Errors — Error rate
  • Duration — Latency percentiles

USE Method (Resource-focused)

  • Utilization — % of resource used
  • Saturation — Work queued
  • Errors — Error count

Correlation IDs

Link logs, metrics, and traces together.

import uuid
from contextvars import ContextVar

request_id: ContextVar[str] = ContextVar("request_id", default="")

@app.middleware("http")
async def correlation_middleware(request, call_next):
    # Get or create request ID
    req_id = request.headers.get("X-Request-ID", str(uuid.uuid4()))
    request_id.set(req_id)

    # Add to response
    response = await call_next(request)
    response.headers["X-Request-ID"] = req_id

    return response

# Include in logs
log.info("processing", request_id=request_id.get())

Health Checks

from fastapi import FastAPI, status
from sqlalchemy import text

@app.get("/health")
async def health():
    return {"status": "healthy"}

@app.get("/health/ready")
async def readiness(db: AsyncSession = Depends(get_db)):
    checks = {}

    # Database
    try:
        await db.execute(text("SELECT 1"))
        checks["database"] = "ok"
    except Exception as e:
        checks["database"] = f"error: {e}"

    # Redis
    try:
        await redis.ping()
        checks["redis"] = "ok"
    except Exception as e:
        checks["redis"] = f"error: {e}"

    all_ok = all(v == "ok" for v in checks.values())

    return JSONResponse(
        content={"status": "ready" if all_ok else "not_ready", "checks": checks},
        status_code=status.HTTP_200_OK if all_ok else status.HTTP_503_SERVICE_UNAVAILABLE
    )