Cookbook¶
Step-by-step recipes for common backend tasks. Each recipe shows the complete vertical slice from model to route.
Recipe 1: Add a New CRUD Resource¶
Goal: Create a Product resource with full CRUD endpoints.
Step 1 — SQLAlchemy model:
# app/models/product.py
from decimal import Decimal
class Product(TimestampMixin, Base):
__tablename__ = "products"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
name: Mapped[str] = mapped_column(String(200))
price: Mapped[Decimal] = mapped_column(Numeric(10, 2))
is_active: Mapped[bool] = mapped_column(default=True)
Step 2 — Pydantic schemas:
# app/schemas/product.py
class ProductCreate(BaseModel):
name: str = Field(min_length=1, max_length=200)
price: Decimal = Field(gt=0)
class ProductUpdate(BaseModel):
name: str | None = Field(None, min_length=1, max_length=200)
price: Decimal | None = Field(None, gt=0)
class ProductResponse(BaseModel):
id: UUID
name: str
price: Decimal
is_active: bool
created_at: datetime
model_config = ConfigDict(from_attributes=True)
Step 3 — Repository:
# app/repositories/product.py
class ProductRepository(BaseRepository[Product]):
async def get_active(self) -> list[Product]:
stmt = select(Product).where(Product.is_active.is_(True))
result = await self.session.execute(stmt)
return list(result.scalars().all())
Step 4 — Service:
# app/services/product.py
class ProductService:
def __init__(self, repo: ProductRepository):
self.repo = repo
async def create_product(self, data: ProductCreate) -> Product:
return await self.repo.create(data.model_dump())
async def get_product(self, product_id: UUID) -> Product:
product = await self.repo.get(product_id)
if not product:
raise NotFoundError("Product", product_id)
return product
Step 5 — Router:
# app/api/v1/products.py
router = APIRouter(prefix="/api/v1/products", tags=["products"])
@router.post("/", response_model=ProductResponse, status_code=201)
async def create_product(
product_in: ProductCreate,
service: ProductService = Depends(get_product_service),
):
return await service.create_product(product_in)
@router.get("/{product_id}", response_model=ProductResponse)
async def get_product(
product_id: UUID,
service: ProductService = Depends(get_product_service),
):
return await service.get_product(product_id)
Step 6 — Migration:
Recipe 2: Add a Filtered List with Pagination¶
Goal: List products with search, price range filters, and pagination.
Step 1 — Repository method:
async def get_filtered(
self,
offset: int,
limit: int,
search: str | None = None,
min_price: Decimal | None = None,
max_price: Decimal | None = None,
) -> tuple[list[Product], int]:
stmt = select(Product).where(Product.is_active.is_(True))
if search:
stmt = stmt.where(Product.name.ilike(f"%{search}%"))
if min_price is not None:
stmt = stmt.where(Product.price >= min_price)
if max_price is not None:
stmt = stmt.where(Product.price <= max_price)
# Count total matching rows
count_stmt = select(func.count()).select_from(stmt.subquery())
total = await self.session.scalar(count_stmt)
# Apply pagination
stmt = stmt.order_by(Product.created_at.desc()).offset(offset).limit(limit)
result = await self.session.execute(stmt)
return list(result.scalars().all()), total
Step 2 — Service method:
async def list_products(
self, page: int, size: int, **filters
) -> Page[Product]:
items, total = await self.repo.get_filtered(
offset=(page - 1) * size, limit=size, **filters
)
return Page.create(items=items, total=total, page=page, size=size)
Step 3 — Route:
@router.get("/", response_model=Page[ProductResponse])
async def list_products(
page: int = Query(1, ge=1),
size: int = Query(20, ge=1, le=100),
search: str | None = Query(None, max_length=200),
min_price: Decimal | None = Query(None, ge=0),
max_price: Decimal | None = Query(None, ge=0),
service: ProductService = Depends(get_product_service),
):
return await service.list_products(
page=page, size=size, search=search, min_price=min_price, max_price=max_price
)
Recipe 3: Protect an Endpoint with Authentication¶
Goal: Make an endpoint require a logged-in user with a specific permission.
# Public endpoint — no auth
@router.get("/products", response_model=Page[ProductResponse])
async def list_products(service: ProductService = Depends(get_product_service)):
return await service.list_products(page=1, size=20)
# Requires authentication
@router.get("/me", response_model=UserResponse)
async def get_me(current_user: User = Depends(get_current_active_user)):
return current_user
# Requires specific permission
@router.delete("/products/{product_id}", status_code=204)
async def delete_product(
product_id: UUID,
service: ProductService = Depends(get_product_service),
_: User = Depends(requires_permission("products:delete")),
):
await service.delete_product(product_id)
Recipe 4: Cross-Service Workflow with Domain Events¶
Goal: When a user is created, send a welcome email without coupling UserService to EmailService.
Step 1 — Event schema:
Step 2 — Publish in service:
class UserService:
async def create_user(self, data: UserCreate) -> User:
user = await self.repo.create(data.model_dump())
await self.event_bus.publish(
"user.created",
UserCreatedEvent(
user_id=user.id,
email=user.email,
name=user.name,
created_at=user.created_at,
),
)
return user
Step 3 — Subscribe in consumer:
@event_bus.subscribe("user.created")
async def on_user_created(event: UserCreatedEvent):
await email_service.send_welcome_email(
to=event.email, name=event.name
)
await analytics_service.track("user_signup", user_id=event.user_id)
Recipe 5: Add a Background Task¶
Goal: Generate a PDF report and email it to the user without blocking the request.
Quick task with BackgroundTasks:
@router.post("/reports/generate")
async def generate_report(
background_tasks: BackgroundTasks,
current_user: User = Depends(get_current_active_user),
service: ReportService = Depends(get_report_service),
):
report_id = await service.create_report_record(current_user.id)
background_tasks.add_task(service.generate_and_email, report_id, current_user.email)
return {"report_id": report_id, "status": "processing"}
Long-running task with Celery:
@celery_app.task(bind=True, max_retries=3, default_retry_delay=120)
def generate_large_report(self, report_id: str):
try:
pdf = build_pdf(report_id) # May take minutes
upload_to_s3(pdf)
send_email_notification(report_id)
except TransientError as exc:
raise self.retry(exc=exc)
Recipe 6: Create an Alembic Migration¶
Goal: Add a new column to an existing table.
Step 1 — Update the model:
class User(TimestampMixin, Base):
__tablename__ = "users"
# ... existing columns ...
phone: Mapped[str | None] = mapped_column(String(20), default=None)
Step 2 — Generate migration:
Step 3 — Review the generated file:
def upgrade() -> None:
op.add_column("users", sa.Column("phone", sa.String(20), nullable=True))
def downgrade() -> None:
op.drop_column("users", "phone")
Step 4 — Apply:
Always verify auto-generated migrations. For data migrations, write manual migration files.