Skip to content

Database Patterns (SQLAlchemy Async)

Engine and Session Configuration

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_sessionmaker,
    create_async_engine,
)

engine = create_async_engine(
    settings.async_database_url,
    echo=False,
    pool_pre_ping=True,
    pool_size=10,
    max_overflow=20,
    pool_recycle=3600,
)

async_session_maker = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)

Model Definition

Use mapped_column and Mapped for all columns. Use mixins for shared fields:

from datetime import datetime
from uuid import UUID, uuid4

from sqlalchemy import String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship


class Base(DeclarativeBase):
    pass


class TimestampMixin:
    created_at: Mapped[datetime] = mapped_column(server_default=func.now())
    updated_at: Mapped[datetime] = mapped_column(
        server_default=func.now(), onupdate=func.now()
    )


class SoftDeleteMixin:
    deleted_at: Mapped[datetime | None] = mapped_column(default=None)

    @property
    def is_deleted(self) -> bool:
        return self.deleted_at is not None


class User(TimestampMixin, Base):
    __tablename__ = "users"

    id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
    email: Mapped[str] = mapped_column(String(320), unique=True, index=True)
    name: Mapped[str] = mapped_column(String(200))
    is_active: Mapped[bool] = mapped_column(default=True)

    organizations: Mapped[list["Organization"]] = relationship(
        secondary="user_organizations", back_populates="members"
    )

Session Lifecycle and Transactions

The session is scoped to a single request via the get_db dependency. For explicit transaction control:

# Single operation — auto-commit via session context
async def create_user(self, data: dict) -> User:
    user = User(**data)
    self.session.add(user)
    await self.session.commit()
    await self.session.refresh(user)
    return user


# Multiple operations in one transaction
async def transfer_ownership(self, from_id: UUID, to_id: UUID, org_id: UUID) -> None:
    async with self.session.begin():
        await self.session.execute(
            update(Membership)
            .where(Membership.user_id == from_id, Membership.org_id == org_id)
            .values(role="member")
        )
        await self.session.execute(
            update(Membership)
            .where(Membership.user_id == to_id, Membership.org_id == org_id)
            .values(role="owner")
        )

MediaResource Pattern

MediaResource uses a polymorphic attachment model rather than direct foreign key relationships. Entity models declare a media_entity_type class attribute to register with the system:

class Product(BaseModel, table=True):
    media_entity_type = MediaEntityType.PRODUCT
    # ... other fields

Media data is queried via MediaResourceCRUD.get_media_for_entity(entity_type, entity_id) rather than SQLAlchemy relationships, since the MediaResourceAttachment table uses entity_type + entity_id instead of per-entity foreign keys. For batch queries (list endpoints), use get_media_for_entities_batch() to avoid N+1.

See MediaResource Lifecycle for the full data model.

Relationship Loading (Preventing N+1)

Always load relationships explicitly. Never rely on lazy loading in async:

from sqlalchemy.orm import joinedload, selectinload


# joinedload — single JOIN query, best for one-to-one
stmt = select(User).options(joinedload(User.profile)).where(User.id == user_id)

# selectinload — separate IN query, best for one-to-many
stmt = select(Organization).options(
    selectinload(Organization.members)
).where(Organization.id == org_id)

# Multiple levels
stmt = select(Organization).options(
    selectinload(Organization.members).joinedload(User.profile)
)

Anti-pattern — N+1 queries:

# BAD: triggers a query per user for .organizations
users = (await session.execute(select(User))).scalars().all()
for user in users:
    print(user.organizations)  # N+1!

# GOOD: eager load upfront
users = (await session.execute(
    select(User).options(selectinload(User.organizations))
)).scalars().all()

Bulk Operations

from sqlalchemy import insert


# Bulk insert
await session.execute(
    insert(AuditLog),
    [
        {"user_id": uid, "action": "login", "timestamp": now}
        for uid in user_ids
    ],
)
await session.commit()

Alembic Migrations

# Create a new migration (auto-detect model changes)
alembic revision --autogenerate -m "Add products table"

# Apply all pending migrations
alembic upgrade head

# Rollback one migration
alembic downgrade -1

# Show current migration state
alembic current

Always review auto-generated migrations before applying. Alembic cannot detect renamed columns, data migrations, or index changes reliably.


See Also