Skip to content

Transactions

Understand ACID guarantees, isolation levels, and how to handle concurrent access.

ACID Properties

Property Meaning PostgreSQL Guarantee
Atomicity All or nothing Commit/rollback entire transaction
Consistency Valid state before and after Constraints enforced
Isolation Transactions don't interfere Configurable isolation levels
Durability Committed data persists WAL (Write-Ahead Log)

Basic Transaction Usage

SQL

BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;

-- On error
ROLLBACK;

Python with SQLAlchemy

from sqlalchemy.ext.asyncio import AsyncSession

async def transfer_funds(
    db: AsyncSession,
    from_account: int,
    to_account: int,
    amount: Decimal
):
    """Transfer funds between accounts atomically."""
    async with db.begin():  # Auto-commit on success, rollback on exception
        from_acc = await db.get(Account, from_account, with_for_update=True)
        to_acc = await db.get(Account, to_account, with_for_update=True)

        if from_acc.balance < amount:
            raise InsufficientFundsError()

        from_acc.balance -= amount
        to_acc.balance += amount
    # Transaction commits here if no exception

Explicit Transaction Control

async def complex_operation(db: AsyncSession):
    """Manual transaction management."""
    try:
        # Start transaction
        await db.begin()

        # Operations...
        user = User(name="Test")
        db.add(user)
        await db.flush()  # Get user.id without committing

        profile = Profile(user_id=user.id)
        db.add(profile)

        # Commit
        await db.commit()
    except Exception:
        await db.rollback()
        raise

Isolation Levels

Control how transactions see changes made by other transactions.

Level Comparison

Level Dirty Read Non-Repeatable Read Phantom Read Use Case
Read Uncommitted Possible Possible Possible Not in PostgreSQL
Read Committed No Possible Possible Default, most apps
Repeatable Read No No No* Financial reports
Serializable No No No Critical consistency

*PostgreSQL's Repeatable Read prevents phantoms too.

Read Committed (Default)

Each statement sees committed data as of statement start.

-- Transaction 1
BEGIN;
SELECT balance FROM accounts WHERE id = 1;  -- Returns 100

-- Transaction 2 commits: UPDATE balance = 50

SELECT balance FROM accounts WHERE id = 1;  -- Returns 50 (sees new data!)
COMMIT;

Repeatable Read

Transaction sees snapshot from transaction start.

SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN;
SELECT balance FROM accounts WHERE id = 1;  -- Returns 100

-- Transaction 2 commits: UPDATE balance = 50

SELECT balance FROM accounts WHERE id = 1;  -- Still returns 100
COMMIT;

Conflict handling:

-- If Transaction 2 modified the same row:
UPDATE accounts SET balance = balance - 10 WHERE id = 1;
-- ERROR: could not serialize access due to concurrent update

-- Your code must retry the transaction

Serializable

Strongest isolation. Transactions behave as if run sequentially.

SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN;
-- Operations...
COMMIT;
-- May fail with serialization error if conflicts detected

Setting Isolation Level in SQLAlchemy

from sqlalchemy import text

# Per-transaction
async with db.begin():
    await db.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))
    # ... operations

# Per-session
engine = create_async_engine(
    DATABASE_URL,
    isolation_level="REPEATABLE READ"
)

Locking

Row-Level Locks

-- Lock rows for update (blocks other UPDATE/DELETE/SELECT FOR UPDATE)
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;

-- Lock but don't wait (fail immediately if locked)
SELECT * FROM accounts WHERE id = 1 FOR UPDATE NOWAIT;

-- Lock but skip locked rows
SELECT * FROM accounts WHERE status = 'pending'
FOR UPDATE SKIP LOCKED LIMIT 10;

-- Share lock (blocks UPDATE/DELETE but not other SELECT FOR SHARE)
SELECT * FROM accounts WHERE id = 1 FOR SHARE;

SQLAlchemy Locking

from sqlalchemy import select

# SELECT FOR UPDATE
stmt = select(Account).where(Account.id == 1).with_for_update()
result = await db.execute(stmt)
account = result.scalar_one()

# SKIP LOCKED (for job queues)
stmt = (
    select(Job)
    .where(Job.status == "pending")
    .with_for_update(skip_locked=True)
    .limit(10)
)

# NOWAIT
stmt = select(Account).where(Account.id == 1).with_for_update(nowait=True)

Advisory Locks

Application-level locks not tied to rows.

-- Acquire lock (blocks until available)
SELECT pg_advisory_lock(12345);

-- Try to acquire (returns false if unavailable)
SELECT pg_try_advisory_lock(12345);

-- Release
SELECT pg_advisory_unlock(12345);

-- Session-level lock (released on disconnect)
SELECT pg_advisory_lock(hashtext('process_daily_report'));

SQLAlchemy Advisory Locks

from sqlalchemy import text

async def with_lock(db: AsyncSession, lock_id: int):
    """Execute with advisory lock."""
    # Acquire lock
    await db.execute(text(f"SELECT pg_advisory_lock({lock_id})"))
    try:
        # Do work...
        pass
    finally:
        # Release lock
        await db.execute(text(f"SELECT pg_advisory_unlock({lock_id})"))

Deadlocks

What Causes Deadlocks

Transaction 1: Locks row A, waits for row B
Transaction 2: Locks row B, waits for row A
→ Deadlock!

PostgreSQL Detection

PostgreSQL automatically detects deadlocks and aborts one transaction.

ERROR: deadlock detected
DETAIL: Process 1234 waits for ShareLock on transaction 5678;
        blocked by process 5678.
        Process 5678 waits for ShareLock on transaction 1234;
        blocked by process 1234.
HINT: See server log for query details.

Preventing Deadlocks

1. Lock in consistent order:

async def transfer(db, account_a, account_b, amount):
    # Always lock lower ID first
    ids = sorted([account_a, account_b])

    async with db.begin():
        for acc_id in ids:
            await db.execute(
                select(Account).where(Account.id == acc_id).with_for_update()
            )
        # Now safe to modify

2. Use NOWAIT to fail fast:

try:
    stmt = select(Account).where(Account.id == id).with_for_update(nowait=True)
    await db.execute(stmt)
except OperationalError:
    # Lock not available, retry later
    pass

3. Keep transactions short:

# Bad: Long transaction holding locks
async with db.begin():
    account = await get_account(db, id)
    await external_api_call()  # Slow! Don't hold locks during this
    account.balance += amount

# Good: Minimize lock duration
result = await external_api_call()
async with db.begin():
    account = await get_account(db, id, for_update=True)
    account.balance += result.amount

Savepoints

Partial rollback within a transaction.

BEGIN;
INSERT INTO users (name) VALUES ('Alice');

SAVEPOINT sp1;
INSERT INTO users (name) VALUES ('Bob');
-- Oops, wrong data
ROLLBACK TO SAVEPOINT sp1;

INSERT INTO users (name) VALUES ('Bob Correct');
COMMIT;
-- Alice and Bob Correct are committed

SQLAlchemy Savepoints

async with db.begin():
    user = User(name="Alice")
    db.add(user)
    await db.flush()

    # Nested transaction (savepoint)
    async with db.begin_nested():
        try:
            # Risky operation
            await risky_operation(db)
        except Exception:
            pass  # Savepoint rolled back, outer transaction continues

    # Continue outer transaction
    db.add(AuditLog(action="user_created"))
# Commit outer transaction

Transaction Patterns

Optimistic Locking

Check version before update, no locks held.

from sqlalchemy import Column, Integer

class Account(Base):
    id = Column(Integer, primary_key=True)
    balance = Column(Integer)
    version = Column(Integer, default=1)

async def update_balance(db: AsyncSession, account_id: int, new_balance: int):
    """Update with optimistic locking."""
    account = await db.get(Account, account_id)
    old_version = account.version

    result = await db.execute(
        update(Account)
        .where(Account.id == account_id)
        .where(Account.version == old_version)
        .values(balance=new_balance, version=old_version + 1)
    )

    if result.rowcount == 0:
        raise ConcurrentModificationError("Account was modified by another transaction")

    await db.commit()

Pessimistic Locking

Lock rows before reading.

async def update_balance_pessimistic(db: AsyncSession, account_id: int, delta: int):
    """Update with pessimistic locking."""
    async with db.begin():
        # Lock the row
        stmt = select(Account).where(Account.id == account_id).with_for_update()
        result = await db.execute(stmt)
        account = result.scalar_one()

        # Safe to modify
        account.balance += delta

Job Queue Pattern

Process jobs without double-processing.

async def claim_jobs(db: AsyncSession, limit: int = 10):
    """Claim pending jobs atomically."""
    async with db.begin():
        stmt = (
            select(Job)
            .where(Job.status == "pending")
            .with_for_update(skip_locked=True)
            .limit(limit)
        )
        result = await db.execute(stmt)
        jobs = result.scalars().all()

        for job in jobs:
            job.status = "processing"
            job.claimed_at = datetime.utcnow()

        return jobs

Best Practices

  1. Keep transactions short — Lock less, conflict less
  2. Use Read Committed unless you need stronger guarantees
  3. Lock in consistent order — Prevent deadlocks
  4. Use SKIP LOCKED for queues — Non-blocking job processing
  5. Prefer optimistic locking — For low-contention scenarios
  6. Always handle serialization failures — Retry on error
  7. Don't hold locks during external calls — Release locks first
  8. Use savepoints for partial rollback — Within complex transactions