Migrations¶
Master Alembic for safe, reliable database schema changes.
Alembic Basics¶
Project Structure¶
alembic/
├── alembic.ini # Configuration
├── env.py # Migration environment
├── script.py.mako # Migration template
└── versions/ # Migration files
├── 001_initial.py
├── 002_add_users.py
└── 003_add_bookings.py
Common Commands¶
# Create new migration (auto-detect changes)
uv run alembic revision --autogenerate -m "Add user preferences"
# Create empty migration (manual)
uv run alembic revision -m "Custom data migration"
# Apply all migrations
uv run alembic upgrade head
# Apply next migration only
uv run alembic upgrade +1
# Rollback one migration
uv run alembic downgrade -1
# Rollback to specific revision
uv run alembic downgrade abc123
# Show current revision
uv run alembic current
# Show migration history
uv run alembic history
# Show pending migrations
uv run alembic history --indicate-current
Writing Migrations¶
Auto-generated Migration¶
"""Add user preferences table
Revision ID: abc123def456
Revises: 789xyz
Create Date: 2024-01-15 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
revision = 'abc123def456'
down_revision = '789xyz'
branch_labels = None
depends_on = None
def upgrade() -> None:
op.create_table(
'user_preferences',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('key', sa.String(100), nullable=False),
sa.Column('value', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('NOW()'), nullable=False),
sa.ForeignKeyConstraint(['user_id'], ['users.id'], ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('user_id', 'key', name='uq_user_preferences_user_key')
)
op.create_index('ix_user_preferences_user_id', 'user_preferences', ['user_id'])
def downgrade() -> None:
op.drop_index('ix_user_preferences_user_id')
op.drop_table('user_preferences')
Manual Migration¶
"""Migrate legacy status values
Revision ID: def456ghi789
"""
from alembic import op
import sqlalchemy as sa
revision = 'def456ghi789'
down_revision = 'abc123def456'
def upgrade() -> None:
# Data migration: update status values
op.execute("""
UPDATE bookings
SET status = 'confirmed'
WHERE status = 'approved'
""")
op.execute("""
UPDATE bookings
SET status = 'cancelled'
WHERE status = 'rejected'
""")
def downgrade() -> None:
op.execute("""
UPDATE bookings
SET status = 'approved'
WHERE status = 'confirmed'
""")
op.execute("""
UPDATE bookings
SET status = 'rejected'
WHERE status = 'cancelled'
""")
Common Operations¶
Adding Columns¶
def upgrade() -> None:
# Add nullable column (safe, instant)
op.add_column('users', sa.Column('phone', sa.String(20), nullable=True))
# Add column with default (may lock table briefly)
op.add_column('users', sa.Column(
'is_verified',
sa.Boolean(),
nullable=False,
server_default=sa.text('false')
))
def downgrade() -> None:
op.drop_column('users', 'phone')
op.drop_column('users', 'is_verified')
Renaming Columns¶
def upgrade() -> None:
op.alter_column('users', 'name', new_column_name='full_name')
def downgrade() -> None:
op.alter_column('users', 'full_name', new_column_name='name')
Changing Column Types¶
def upgrade() -> None:
# Change varchar length
op.alter_column('users', 'email',
type_=sa.String(255),
existing_type=sa.String(100)
)
# Change type with cast
op.alter_column('products', 'price',
type_=sa.Numeric(10, 2),
existing_type=sa.Integer(),
postgresql_using='price::numeric(10,2)'
)
def downgrade() -> None:
op.alter_column('users', 'email',
type_=sa.String(100),
existing_type=sa.String(255)
)
Adding Indexes¶
def upgrade() -> None:
# Standard index
op.create_index('ix_bookings_user_id', 'bookings', ['user_id'])
# Unique index
op.create_index('ix_users_email', 'users', ['email'], unique=True)
# Partial index
op.create_index(
'ix_users_email_active',
'users',
['email'],
postgresql_where=sa.text("status = 'active'")
)
# Concurrent index (no lock, but slower)
op.execute("""
CREATE INDEX CONCURRENTLY ix_large_table_col
ON large_table(column_name)
""")
def downgrade() -> None:
op.drop_index('ix_bookings_user_id')
op.drop_index('ix_users_email')
op.drop_index('ix_users_email_active')
op.execute("DROP INDEX CONCURRENTLY ix_large_table_col")
Adding Constraints¶
def upgrade() -> None:
# Foreign key
op.create_foreign_key(
'fk_bookings_user',
'bookings', 'users',
['user_id'], ['id'],
ondelete='CASCADE'
)
# Check constraint
op.create_check_constraint(
'ck_bookings_amount_positive',
'bookings',
sa.text('amount > 0')
)
# Unique constraint
op.create_unique_constraint(
'uq_slots_date_time_resource',
'slots',
['date', 'time_slot', 'resource_id']
)
def downgrade() -> None:
op.drop_constraint('fk_bookings_user', 'bookings', type_='foreignkey')
op.drop_constraint('ck_bookings_amount_positive', 'bookings', type_='check')
op.drop_constraint('uq_slots_date_time_resource', 'slots', type_='unique')
Data Migrations¶
Backfilling Data¶
"""Backfill user full names
Revision ID: xyz789
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.orm import Session
revision = 'xyz789'
down_revision = 'abc456'
def upgrade() -> None:
# For small tables: single UPDATE
op.execute("""
UPDATE users
SET full_name = first_name || ' ' || last_name
WHERE full_name IS NULL
""")
def downgrade() -> None:
# Usually no-op for backfills
pass
Batched Data Migration¶
For large tables, process in batches to avoid long locks.
"""Migrate legacy data in batches
Revision ID: batch123
"""
from alembic import op
import sqlalchemy as sa
revision = 'batch123'
down_revision = 'xyz789'
BATCH_SIZE = 10000
!!! note "Transaction Configuration"
For controlling transaction behavior per migration (e.g., when using `COMMIT` within a migration), see [Adding Indexes on Large Tables](#adding-indexes-on-large-tables) below.
def upgrade() -> None:
conn = op.get_bind()
while True:
# Process batch
result = conn.execute(sa.text("""
UPDATE users
SET new_status =
CASE status
WHEN 'approved' THEN 'active'
WHEN 'rejected' THEN 'inactive'
ELSE status
END
WHERE id IN (
SELECT id FROM users
WHERE new_status IS NULL
LIMIT :batch_size
)
RETURNING id
"""), {"batch_size": BATCH_SIZE})
updated = result.rowcount
if updated == 0:
break
# Commit batch
conn.execute(sa.text("COMMIT"))
conn.execute(sa.text("BEGIN"))
def downgrade() -> None:
pass # Data migration, no rollback
Using ORM in Migrations¶
"""Complex data migration using ORM
Revision ID: orm123
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.orm import Session
revision = 'orm123'
down_revision = 'batch123'
def upgrade() -> None:
bind = op.get_bind()
session = Session(bind=bind)
# Define table for migration (don't import from models!)
users = sa.Table(
'users',
sa.MetaData(),
sa.Column('id', sa.Integer, primary_key=True),
sa.Column('email', sa.String),
sa.Column('normalized_email', sa.String),
)
# Query and update
for user in session.execute(sa.select(users)).fetchall():
session.execute(
sa.update(users)
.where(users.c.id == user.id)
.values(normalized_email=user.email.lower().strip())
)
session.commit()
def downgrade() -> None:
pass
Safe Migration Patterns¶
Adding NOT NULL Columns¶
Wrong: Locks table while backfilling.
Right: Add nullable, backfill, then add constraint.
# Migration 1: Add nullable column
def upgrade():
op.add_column('users', sa.Column('phone', sa.String(20), nullable=True))
# Migration 2: Backfill data (batched for large tables)
def upgrade():
op.execute("UPDATE users SET phone = 'unknown' WHERE phone IS NULL")
# Migration 3: Add NOT NULL constraint
def upgrade():
op.alter_column('users', 'phone', nullable=False)
Renaming Tables/Columns¶
Wrong: Breaks application during deployment.
Right: Expand-contract pattern.
# Migration 1: Add new column/table
def upgrade():
op.add_column('users', sa.Column('full_name', sa.String(200), nullable=True))
# Deploy code that writes to BOTH columns
# Migration 2: Backfill data
def upgrade():
op.execute("UPDATE users SET full_name = name WHERE full_name IS NULL")
# Migration 3: Make new column NOT NULL
def upgrade():
op.alter_column('users', 'full_name', nullable=False)
# Deploy code that reads from new column only
# Migration 4: Drop old column
def upgrade():
op.drop_column('users', 'name')
Adding Indexes on Large Tables¶
def upgrade() -> None:
# Use CONCURRENTLY to avoid locking
# Note: Can't be in a transaction
op.execute("""
CREATE INDEX CONCURRENTLY IF NOT EXISTS ix_large_table_col
ON large_table(column_name)
""")
def downgrade() -> None:
op.execute("DROP INDEX CONCURRENTLY IF EXISTS ix_large_table_col")
Note: Run concurrent migrations outside of transaction:
# In env.py, set for this migration
def run_migrations_online():
# For concurrent index creation
context.configure(
connection=connection,
target_metadata=target_metadata,
transaction_per_migration=True # Each migration in its own transaction
)
Rollback Strategies¶
Clean Rollback¶
# Rollback single migration
uv run alembic downgrade -1
# Rollback to specific revision
uv run alembic downgrade abc123
# Rollback all migrations
uv run alembic downgrade base
When Rollback Isn't Possible¶
Some changes can't be rolled back: - Data loss (dropping columns with data) - Type changes that lose precision - Large data migrations
Strategy: Test thoroughly, have database backups.
def downgrade() -> None:
# Intentionally raise to prevent accidental rollback
raise Exception(
"This migration cannot be rolled back. "
"Restore from backup if needed."
)
Testing Migrations¶
Test Upgrade/Downgrade Cycle¶
# tests/test_migrations.py
import pytest
from alembic import command
from alembic.config import Config
@pytest.fixture
def alembic_config():
return Config("alembic.ini")
def test_migrations_upgrade_downgrade(alembic_config, test_database):
"""Test that migrations can upgrade and downgrade cleanly."""
# Upgrade to head
command.upgrade(alembic_config, "head")
# Downgrade to base
command.downgrade(alembic_config, "base")
# Upgrade again
command.upgrade(alembic_config, "head")
Test Data Integrity¶
def test_migration_preserves_data(alembic_config, test_database):
"""Test that data migration preserves existing data."""
# Setup: Create test data at previous revision
command.upgrade(alembic_config, "abc123")
# Insert test data
with test_database.connect() as conn:
conn.execute(text("INSERT INTO users (name) VALUES ('Test')"))
conn.commit()
# Run migration
command.upgrade(alembic_config, "def456")
# Verify data integrity
with test_database.connect() as conn:
result = conn.execute(text("SELECT * FROM users WHERE name = 'Test'"))
assert result.fetchone() is not None
Best Practices¶
- One logical change per migration — Easier to review and rollback
- Always write downgrade — Even if it's just a comment explaining why not
- Test migrations on production-like data — Size and shape matter
- Use CONCURRENTLY for large table indexes — Avoid locking production
- Backfill in batches — Don't lock large tables
- Never modify deployed migrations — Create new ones instead
- Review auto-generated migrations — They're not always correct
- Keep migrations fast — Long-running migrations = downtime risk
See Also¶
- Database Patterns (SQLAlchemy) -- Model definitions, session lifecycle, and relationship loading
- Connection Management -- Connection pooling and session configuration that migrations depend on