Database Patterns (SQLAlchemy Async)¶
Engine and Session Configuration¶
from sqlalchemy.ext.asyncio import (
AsyncSession,
async_sessionmaker,
create_async_engine,
)
engine = create_async_engine(
settings.async_database_url,
echo=False,
pool_pre_ping=True,
pool_size=10,
max_overflow=20,
pool_recycle=3600,
)
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
Model Definition¶
Use mapped_column and Mapped for all columns. Use mixins for shared fields:
from datetime import datetime
from uuid import UUID, uuid4
from sqlalchemy import String, func
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class Base(DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=func.now(), onupdate=func.now()
)
class SoftDeleteMixin:
deleted_at: Mapped[datetime | None] = mapped_column(default=None)
@property
def is_deleted(self) -> bool:
return self.deleted_at is not None
class User(TimestampMixin, Base):
__tablename__ = "users"
id: Mapped[UUID] = mapped_column(primary_key=True, default=uuid4)
email: Mapped[str] = mapped_column(String(320), unique=True, index=True)
name: Mapped[str] = mapped_column(String(200))
is_active: Mapped[bool] = mapped_column(default=True)
organizations: Mapped[list["Organization"]] = relationship(
secondary="user_organizations", back_populates="members"
)
Session Lifecycle and Transactions¶
The session is scoped to a single request via the get_db dependency. For explicit transaction control:
# Single operation — auto-commit via session context
async def create_user(self, data: dict) -> User:
user = User(**data)
self.session.add(user)
await self.session.commit()
await self.session.refresh(user)
return user
# Multiple operations in one transaction
async def transfer_ownership(self, from_id: UUID, to_id: UUID, org_id: UUID) -> None:
async with self.session.begin():
await self.session.execute(
update(Membership)
.where(Membership.user_id == from_id, Membership.org_id == org_id)
.values(role="member")
)
await self.session.execute(
update(Membership)
.where(Membership.user_id == to_id, Membership.org_id == org_id)
.values(role="owner")
)
MediaResource Pattern¶
MediaResource uses a polymorphic attachment model rather than direct foreign key relationships. Entity models declare a media_entity_type class attribute to register with the system:
class Product(BaseModel, table=True):
media_entity_type = MediaEntityType.PRODUCT
# ... other fields
Media data is queried via MediaResourceCRUD.get_media_for_entity(entity_type, entity_id) rather than SQLAlchemy relationships, since the MediaResourceAttachment table uses entity_type + entity_id instead of per-entity foreign keys. For batch queries (list endpoints), use get_media_for_entities_batch() to avoid N+1.
See MediaResource Lifecycle for the full data model.
Relationship Loading (Preventing N+1)¶
Always load relationships explicitly. Never rely on lazy loading in async:
from sqlalchemy.orm import joinedload, selectinload
# joinedload — single JOIN query, best for one-to-one
stmt = select(User).options(joinedload(User.profile)).where(User.id == user_id)
# selectinload — separate IN query, best for one-to-many
stmt = select(Organization).options(
selectinload(Organization.members)
).where(Organization.id == org_id)
# Multiple levels
stmt = select(Organization).options(
selectinload(Organization.members).joinedload(User.profile)
)
Anti-pattern — N+1 queries:
# BAD: triggers a query per user for .organizations
users = (await session.execute(select(User))).scalars().all()
for user in users:
print(user.organizations) # N+1!
# GOOD: eager load upfront
users = (await session.execute(
select(User).options(selectinload(User.organizations))
)).scalars().all()
Bulk Operations¶
from sqlalchemy import insert
# Bulk insert
await session.execute(
insert(AuditLog),
[
{"user_id": uid, "action": "login", "timestamp": now}
for uid in user_ids
],
)
await session.commit()
Alembic Migrations¶
# Create a new migration (auto-detect model changes)
alembic revision --autogenerate -m "Add products table"
# Apply all pending migrations
alembic upgrade head
# Rollback one migration
alembic downgrade -1
# Show current migration state
alembic current
Always review auto-generated migrations before applying. Alembic cannot detect renamed columns, data migrations, or index changes reliably.
See Also¶
- Migrations -- In-depth Alembic migration patterns and safe migration strategies
- Connection Management -- Connection pooling, PgBouncer, and session lifecycle
- Database Patterns -- Soft deletes, audit trails, and full-text search patterns