Distributed Tracing¶
Track requests across services with OpenTelemetry.
Concepts¶
Trace¶
A trace represents the entire journey of a request through your system.
Trace: abc123
├─ Span: HTTP POST /orders (120ms)
│ ├─ Span: validate_order (5ms)
│ ├─ Span: db.insert orders (15ms)
│ ├─ Span: HTTP POST payment-service/charge (80ms)
│ │ └─ Span: stripe.create_charge (70ms)
│ └─ Span: send_confirmation_email (10ms)
Span¶
A unit of work within a trace. Contains: - Operation name - Start/end time - Parent span (if any) - Attributes (key-value data) - Events (timestamped logs) - Status
OpenTelemetry Setup¶
Installation¶
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp
pip install opentelemetry-instrumentation-fastapi
pip install opentelemetry-instrumentation-sqlalchemy
pip install opentelemetry-instrumentation-httpx
Configuration¶
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
# Configure resource
resource = Resource.create({
"service.name": "api",
"service.version": "1.0.0",
"deployment.environment": "production",
})
# Create tracer provider
provider = TracerProvider(resource=resource)
# Add exporter
otlp_exporter = OTLPSpanExporter(endpoint="http://jaeger:4317", insecure=True)
provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
# Set as global tracer provider
trace.set_tracer_provider(provider)
Auto-Instrumentation¶
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
# Instrument SQLAlchemy
SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
# Instrument HTTPX
HTTPXClientInstrumentor().instrument()
Manual Instrumentation¶
Creating Spans¶
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
@tracer.start_as_current_span("process_order")
def process_order(order_id: int):
span = trace.get_current_span()
span.set_attribute("order.id", order_id)
validate_order(order_id)
charge_payment(order_id)
send_confirmation(order_id)
@tracer.start_as_current_span("validate_order")
def validate_order(order_id: int):
# Validation logic...
pass
Adding Attributes¶
def process_order(order: Order):
span = trace.get_current_span()
# Add attributes
span.set_attribute("order.id", order.id)
span.set_attribute("order.total", float(order.total))
span.set_attribute("order.items_count", len(order.items))
span.set_attribute("customer.id", order.customer_id)
Adding Events¶
def process_payment(order_id: int, amount: float):
span = trace.get_current_span()
span.add_event("payment_started", {
"order_id": order_id,
"amount": amount,
})
try:
result = charge_card(amount)
span.add_event("payment_completed", {
"transaction_id": result.transaction_id,
})
except PaymentError as e:
span.add_event("payment_failed", {
"error": str(e),
})
raise
Setting Status¶
from opentelemetry.trace import Status, StatusCode
def risky_operation():
span = trace.get_current_span()
try:
result = perform_operation()
span.set_status(Status(StatusCode.OK))
return result
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Context Propagation¶
HTTP Headers¶
from opentelemetry.propagate import inject, extract
from opentelemetry import context
# Outgoing request: inject context
async def call_service(url: str, data: dict):
headers = {}
inject(headers) # Adds traceparent, tracestate headers
async with httpx.AsyncClient() as client:
return await client.post(url, json=data, headers=headers)
# Incoming request: extract context
@app.middleware("http")
async def trace_middleware(request: Request, call_next):
ctx = extract(request.headers)
with context.attach(ctx):
return await call_next(request)
Async Context¶
import asyncio
from opentelemetry import context
async def async_operation():
# Context automatically propagated in asyncio
with tracer.start_as_current_span("parent"):
# Child spans inherit parent
await asyncio.gather(
child_operation_1(),
child_operation_2(),
)
async def child_operation_1():
with tracer.start_as_current_span("child_1"):
# This is a child of "parent"
pass
FastAPI Integration¶
Complete Setup¶
from fastapi import FastAPI, Request
from opentelemetry import trace
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
app = FastAPI()
# Auto-instrument
FastAPIInstrumentor.instrument_app(app)
tracer = trace.get_tracer(__name__)
@app.post("/orders")
async def create_order(order: OrderCreate):
with tracer.start_as_current_span("create_order") as span:
span.set_attribute("order.items", len(order.items))
# Business logic with nested spans
validated = await validate_order(order)
result = await save_order(validated)
span.set_attribute("order.id", result.id)
return result
async def validate_order(order: OrderCreate):
with tracer.start_as_current_span("validate_order"):
# Validation...
return order
async def save_order(order: OrderCreate):
with tracer.start_as_current_span("save_order"):
# Database operation...
pass
Viewing Traces¶
Jaeger¶
# docker-compose.yml
services:
jaeger:
image: jaegertracing/all-in-one:latest
ports:
- "16686:16686" # UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
environment:
COLLECTOR_OTLP_ENABLED: true
Access UI at http://localhost:16686
Grafana Tempo¶
services:
tempo:
image: grafana/tempo:latest
ports:
- "4317:4317"
volumes:
- ./tempo.yaml:/etc/tempo.yaml
command: ["-config.file=/etc/tempo.yaml"]
Best Practices¶
- Name spans clearly —
service.operationpattern - Add meaningful attributes — IDs, counts, relevant data
- Don't over-trace — Focus on boundaries and slow operations
- Propagate context — Across HTTP, message queues, async
- Set status correctly — OK, ERROR with description
- Use sampling — Not every request needs tracing
Sampling¶
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBasedTraceIdRatio
# Sample 10% of traces
sampler = ParentBasedTraceIdRatio(0.1)
provider = TracerProvider(
resource=resource,
sampler=sampler,
)
Connecting Logs and Traces¶
import structlog
from opentelemetry import trace
def get_trace_context():
span = trace.get_current_span()
if span:
ctx = span.get_span_context()
return {
"trace_id": format(ctx.trace_id, "032x"),
"span_id": format(ctx.span_id, "016x"),
}
return {}
# Add to structlog
structlog.configure(
processors=[
lambda _, __, event_dict: {**event_dict, **get_trace_context()},
# ... other processors
]
)