Transactions¶
Understand ACID guarantees, isolation levels, and how to handle concurrent access.
ACID Properties¶
| Property | Meaning | PostgreSQL Guarantee |
|---|---|---|
| Atomicity | All or nothing | Commit/rollback entire transaction |
| Consistency | Valid state before and after | Constraints enforced |
| Isolation | Transactions don't interfere | Configurable isolation levels |
| Durability | Committed data persists | WAL (Write-Ahead Log) |
Basic Transaction Usage¶
SQL¶
BEGIN;
UPDATE accounts SET balance = balance - 100 WHERE id = 1;
UPDATE accounts SET balance = balance + 100 WHERE id = 2;
COMMIT;
-- On error
ROLLBACK;
Python with SQLAlchemy¶
from sqlalchemy.ext.asyncio import AsyncSession
async def transfer_funds(
db: AsyncSession,
from_account: int,
to_account: int,
amount: Decimal
):
"""Transfer funds between accounts atomically."""
async with db.begin(): # Auto-commit on success, rollback on exception
from_acc = await db.get(Account, from_account, with_for_update=True)
to_acc = await db.get(Account, to_account, with_for_update=True)
if from_acc.balance < amount:
raise InsufficientFundsError()
from_acc.balance -= amount
to_acc.balance += amount
# Transaction commits here if no exception
Explicit Transaction Control¶
async def complex_operation(db: AsyncSession):
"""Manual transaction management."""
try:
# Start transaction
await db.begin()
# Operations...
user = User(name="Test")
db.add(user)
await db.flush() # Get user.id without committing
profile = Profile(user_id=user.id)
db.add(profile)
# Commit
await db.commit()
except Exception:
await db.rollback()
raise
Isolation Levels¶
Control how transactions see changes made by other transactions.
Level Comparison¶
| Level | Dirty Read | Non-Repeatable Read | Phantom Read | Use Case |
|---|---|---|---|---|
| Read Uncommitted | Possible | Possible | Possible | Not in PostgreSQL |
| Read Committed | No | Possible | Possible | Default, most apps |
| Repeatable Read | No | No | No* | Financial reports |
| Serializable | No | No | No | Critical consistency |
*PostgreSQL's Repeatable Read prevents phantoms too.
Read Committed (Default)¶
Each statement sees committed data as of statement start.
-- Transaction 1
BEGIN;
SELECT balance FROM accounts WHERE id = 1; -- Returns 100
-- Transaction 2 commits: UPDATE balance = 50
SELECT balance FROM accounts WHERE id = 1; -- Returns 50 (sees new data!)
COMMIT;
Repeatable Read¶
Transaction sees snapshot from transaction start.
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
BEGIN;
SELECT balance FROM accounts WHERE id = 1; -- Returns 100
-- Transaction 2 commits: UPDATE balance = 50
SELECT balance FROM accounts WHERE id = 1; -- Still returns 100
COMMIT;
Conflict handling:
-- If Transaction 2 modified the same row:
UPDATE accounts SET balance = balance - 10 WHERE id = 1;
-- ERROR: could not serialize access due to concurrent update
-- Your code must retry the transaction
Serializable¶
Strongest isolation. Transactions behave as if run sequentially.
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
BEGIN;
-- Operations...
COMMIT;
-- May fail with serialization error if conflicts detected
Setting Isolation Level in SQLAlchemy¶
from sqlalchemy import text
# Per-transaction
async with db.begin():
await db.execute(text("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ"))
# ... operations
# Per-session
engine = create_async_engine(
DATABASE_URL,
isolation_level="REPEATABLE READ"
)
Locking¶
Row-Level Locks¶
-- Lock rows for update (blocks other UPDATE/DELETE/SELECT FOR UPDATE)
SELECT * FROM accounts WHERE id = 1 FOR UPDATE;
-- Lock but don't wait (fail immediately if locked)
SELECT * FROM accounts WHERE id = 1 FOR UPDATE NOWAIT;
-- Lock but skip locked rows
SELECT * FROM accounts WHERE status = 'pending'
FOR UPDATE SKIP LOCKED LIMIT 10;
-- Share lock (blocks UPDATE/DELETE but not other SELECT FOR SHARE)
SELECT * FROM accounts WHERE id = 1 FOR SHARE;
SQLAlchemy Locking¶
from sqlalchemy import select
# SELECT FOR UPDATE
stmt = select(Account).where(Account.id == 1).with_for_update()
result = await db.execute(stmt)
account = result.scalar_one()
# SKIP LOCKED (for job queues)
stmt = (
select(Job)
.where(Job.status == "pending")
.with_for_update(skip_locked=True)
.limit(10)
)
# NOWAIT
stmt = select(Account).where(Account.id == 1).with_for_update(nowait=True)
Advisory Locks¶
Application-level locks not tied to rows.
-- Acquire lock (blocks until available)
SELECT pg_advisory_lock(12345);
-- Try to acquire (returns false if unavailable)
SELECT pg_try_advisory_lock(12345);
-- Release
SELECT pg_advisory_unlock(12345);
-- Session-level lock (released on disconnect)
SELECT pg_advisory_lock(hashtext('process_daily_report'));
SQLAlchemy Advisory Locks¶
from sqlalchemy import text
async def with_lock(db: AsyncSession, lock_id: int):
"""Execute with advisory lock."""
# Acquire lock
await db.execute(text(f"SELECT pg_advisory_lock({lock_id})"))
try:
# Do work...
pass
finally:
# Release lock
await db.execute(text(f"SELECT pg_advisory_unlock({lock_id})"))
Deadlocks¶
What Causes Deadlocks¶
PostgreSQL Detection¶
PostgreSQL automatically detects deadlocks and aborts one transaction.
ERROR: deadlock detected
DETAIL: Process 1234 waits for ShareLock on transaction 5678;
blocked by process 5678.
Process 5678 waits for ShareLock on transaction 1234;
blocked by process 1234.
HINT: See server log for query details.
Preventing Deadlocks¶
1. Lock in consistent order:
async def transfer(db, account_a, account_b, amount):
# Always lock lower ID first
ids = sorted([account_a, account_b])
async with db.begin():
for acc_id in ids:
await db.execute(
select(Account).where(Account.id == acc_id).with_for_update()
)
# Now safe to modify
2. Use NOWAIT to fail fast:
try:
stmt = select(Account).where(Account.id == id).with_for_update(nowait=True)
await db.execute(stmt)
except OperationalError:
# Lock not available, retry later
pass
3. Keep transactions short:
# Bad: Long transaction holding locks
async with db.begin():
account = await get_account(db, id)
await external_api_call() # Slow! Don't hold locks during this
account.balance += amount
# Good: Minimize lock duration
result = await external_api_call()
async with db.begin():
account = await get_account(db, id, for_update=True)
account.balance += result.amount
Savepoints¶
Partial rollback within a transaction.
BEGIN;
INSERT INTO users (name) VALUES ('Alice');
SAVEPOINT sp1;
INSERT INTO users (name) VALUES ('Bob');
-- Oops, wrong data
ROLLBACK TO SAVEPOINT sp1;
INSERT INTO users (name) VALUES ('Bob Correct');
COMMIT;
-- Alice and Bob Correct are committed
SQLAlchemy Savepoints¶
async with db.begin():
user = User(name="Alice")
db.add(user)
await db.flush()
# Nested transaction (savepoint)
async with db.begin_nested():
try:
# Risky operation
await risky_operation(db)
except Exception:
pass # Savepoint rolled back, outer transaction continues
# Continue outer transaction
db.add(AuditLog(action="user_created"))
# Commit outer transaction
Transaction Patterns¶
Optimistic Locking¶
Check version before update, no locks held.
from sqlalchemy import Column, Integer
class Account(Base):
id = Column(Integer, primary_key=True)
balance = Column(Integer)
version = Column(Integer, default=1)
async def update_balance(db: AsyncSession, account_id: int, new_balance: int):
"""Update with optimistic locking."""
account = await db.get(Account, account_id)
old_version = account.version
result = await db.execute(
update(Account)
.where(Account.id == account_id)
.where(Account.version == old_version)
.values(balance=new_balance, version=old_version + 1)
)
if result.rowcount == 0:
raise ConcurrentModificationError("Account was modified by another transaction")
await db.commit()
Pessimistic Locking¶
Lock rows before reading.
async def update_balance_pessimistic(db: AsyncSession, account_id: int, delta: int):
"""Update with pessimistic locking."""
async with db.begin():
# Lock the row
stmt = select(Account).where(Account.id == account_id).with_for_update()
result = await db.execute(stmt)
account = result.scalar_one()
# Safe to modify
account.balance += delta
Job Queue Pattern¶
Process jobs without double-processing.
async def claim_jobs(db: AsyncSession, limit: int = 10):
"""Claim pending jobs atomically."""
async with db.begin():
stmt = (
select(Job)
.where(Job.status == "pending")
.with_for_update(skip_locked=True)
.limit(limit)
)
result = await db.execute(stmt)
jobs = result.scalars().all()
for job in jobs:
job.status = "processing"
job.claimed_at = datetime.utcnow()
return jobs
Best Practices¶
- Keep transactions short — Lock less, conflict less
- Use Read Committed unless you need stronger guarantees
- Lock in consistent order — Prevent deadlocks
- Use SKIP LOCKED for queues — Non-blocking job processing
- Prefer optimistic locking — For low-contention scenarios
- Always handle serialization failures — Retry on error
- Don't hold locks during external calls — Release locks first
- Use savepoints for partial rollback — Within complex transactions