Skip to content

Cookbook

Step-by-step recipes for common backend tasks. Each recipe shows the complete vertical slice from model to route.


Recipe 1: Add a New CRUD Resource

Goal: Create a Product resource with full CRUD endpoints.

Step 1 — SQLAlchemy model:

# app/models/product.py
from decimal import Decimal

class Product(TimestampMixin, Base):
    __tablename__ = "products"

    id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
    name: Mapped[str] = mapped_column(String(200))
    price: Mapped[Decimal] = mapped_column(Numeric(10, 2))
    is_active: Mapped[bool] = mapped_column(default=True)

Step 2 — Pydantic schemas:

# app/schemas/product.py
class ProductCreate(BaseModel):
    name: str = Field(min_length=1, max_length=200)
    price: Decimal = Field(gt=0)

class ProductUpdate(BaseModel):
    name: str | None = Field(None, min_length=1, max_length=200)
    price: Decimal | None = Field(None, gt=0)

class ProductResponse(BaseModel):
    id: UUID
    name: str
    price: Decimal
    is_active: bool
    created_at: datetime
    model_config = ConfigDict(from_attributes=True)

Step 3 — Repository:

# app/repositories/product.py
class ProductRepository(BaseRepository[Product]):
    async def get_active(self) -> list[Product]:
        stmt = select(Product).where(Product.is_active.is_(True))
        result = await self.session.execute(stmt)
        return list(result.scalars().all())

Step 4 — Service:

# app/services/product.py
class ProductService:
    def __init__(self, repo: ProductRepository):
        self.repo = repo

    async def create_product(self, data: ProductCreate) -> Product:
        return await self.repo.create(data.model_dump())

    async def get_product(self, product_id: UUID) -> Product:
        product = await self.repo.get(product_id)
        if not product:
            raise NotFoundError("Product", product_id)
        return product

Step 5 — Router:

# app/api/v1/products.py
router = APIRouter(prefix="/api/v1/products", tags=["products"])

@router.post("/", response_model=ProductResponse, status_code=201)
async def create_product(
    product_in: ProductCreate,
    service: ProductService = Depends(get_product_service),
):
    return await service.create_product(product_in)

@router.get("/{product_id}", response_model=ProductResponse)
async def get_product(
    product_id: UUID,
    service: ProductService = Depends(get_product_service),
):
    return await service.get_product(product_id)

Step 6 — Migration:

alembic revision --autogenerate -m "Add products table"
alembic upgrade head

Recipe 2: Add a Filtered List with Pagination

Goal: List products with search, price range filters, and pagination.

Step 1 — Repository method:

async def get_filtered(
    self,
    offset: int,
    limit: int,
    search: str | None = None,
    min_price: Decimal | None = None,
    max_price: Decimal | None = None,
) -> tuple[list[Product], int]:
    stmt = select(Product).where(Product.is_active.is_(True))

    if search:
        stmt = stmt.where(Product.name.ilike(f"%{search}%"))
    if min_price is not None:
        stmt = stmt.where(Product.price >= min_price)
    if max_price is not None:
        stmt = stmt.where(Product.price <= max_price)

    # Count total matching rows
    count_stmt = select(func.count()).select_from(stmt.subquery())
    total = await self.session.scalar(count_stmt)

    # Apply pagination
    stmt = stmt.order_by(Product.created_at.desc()).offset(offset).limit(limit)
    result = await self.session.execute(stmt)

    return list(result.scalars().all()), total

Step 2 — Service method:

async def list_products(
    self, page: int, size: int, **filters
) -> Page[Product]:
    items, total = await self.repo.get_filtered(
        offset=(page - 1) * size, limit=size, **filters
    )
    return Page.create(items=items, total=total, page=page, size=size)

Step 3 — Route:

@router.get("/", response_model=Page[ProductResponse])
async def list_products(
    page: int = Query(1, ge=1),
    size: int = Query(20, ge=1, le=100),
    search: str | None = Query(None, max_length=200),
    min_price: Decimal | None = Query(None, ge=0),
    max_price: Decimal | None = Query(None, ge=0),
    service: ProductService = Depends(get_product_service),
):
    return await service.list_products(
        page=page, size=size, search=search, min_price=min_price, max_price=max_price
    )

Recipe 3: Protect an Endpoint with Authentication

Goal: Make an endpoint require a logged-in user with a specific permission.

# Public endpoint — no auth
@router.get("/products", response_model=Page[ProductResponse])
async def list_products(service: ProductService = Depends(get_product_service)):
    return await service.list_products(page=1, size=20)


# Requires authentication
@router.get("/me", response_model=UserResponse)
async def get_me(current_user: User = Depends(get_current_active_user)):
    return current_user


# Requires specific permission
@router.delete("/products/{product_id}", status_code=204)
async def delete_product(
    product_id: UUID,
    service: ProductService = Depends(get_product_service),
    _: User = Depends(requires_permission("products:delete")),
):
    await service.delete_product(product_id)

Recipe 4: Cross-Service Workflow with Domain Events

Goal: When a user is created, send a welcome email without coupling UserService to EmailService.

Step 1 — Event schema:

class UserCreatedEvent(BaseModel):
    user_id: UUID
    email: EmailStr
    name: str
    created_at: datetime

Step 2 — Publish in service:

class UserService:
    async def create_user(self, data: UserCreate) -> User:
        user = await self.repo.create(data.model_dump())

        await self.event_bus.publish(
            "user.created",
            UserCreatedEvent(
                user_id=user.id,
                email=user.email,
                name=user.name,
                created_at=user.created_at,
            ),
        )

        return user

Step 3 — Subscribe in consumer:

@event_bus.subscribe("user.created")
async def on_user_created(event: UserCreatedEvent):
    await email_service.send_welcome_email(
        to=event.email, name=event.name
    )
    await analytics_service.track("user_signup", user_id=event.user_id)

Recipe 5: Add a Background Task

Goal: Generate a PDF report and email it to the user without blocking the request.

Quick task with BackgroundTasks:

@router.post("/reports/generate")
async def generate_report(
    background_tasks: BackgroundTasks,
    current_user: User = Depends(get_current_active_user),
    service: ReportService = Depends(get_report_service),
):
    report_id = await service.create_report_record(current_user.id)
    background_tasks.add_task(service.generate_and_email, report_id, current_user.email)
    return {"report_id": report_id, "status": "processing"}

Long-running task with Celery:

@celery_app.task(bind=True, max_retries=3, default_retry_delay=120)
def generate_large_report(self, report_id: str):
    try:
        pdf = build_pdf(report_id)  # May take minutes
        upload_to_s3(pdf)
        send_email_notification(report_id)
    except TransientError as exc:
        raise self.retry(exc=exc)

Recipe 6: Create an Alembic Migration

Goal: Add a new column to an existing table.

Step 1 — Update the model:

class User(TimestampMixin, Base):
    __tablename__ = "users"
    # ... existing columns ...
    phone: Mapped[str | None] = mapped_column(String(20), default=None)

Step 2 — Generate migration:

alembic revision --autogenerate -m "Add phone column to users"

Step 3 — Review the generated file:

def upgrade() -> None:
    op.add_column("users", sa.Column("phone", sa.String(20), nullable=True))

def downgrade() -> None:
    op.drop_column("users", "phone")

Step 4 — Apply:

alembic upgrade head

Always verify auto-generated migrations. For data migrations, write manual migration files.