Audit Trails¶
Track all changes to database records for compliance and debugging.
Audit Table Design¶
Basic Audit Table¶
CREATE TABLE audit_log (
id BIGSERIAL PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
record_id INTEGER NOT NULL,
action VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
old_data JSONB,
new_data JSONB,
changed_fields TEXT[],
user_id INTEGER,
user_email VARCHAR(255),
ip_address INET,
user_agent TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
-- Indexes for common queries
CREATE INDEX idx_audit_log_table_record ON audit_log(table_name, record_id);
CREATE INDEX idx_audit_log_created_at ON audit_log(created_at);
CREATE INDEX idx_audit_log_user_id ON audit_log(user_id);
CREATE INDEX idx_audit_log_action ON audit_log(action);
Partitioned Audit Table (for high volume)¶
CREATE TABLE audit_log (
id BIGSERIAL,
table_name VARCHAR(100) NOT NULL,
record_id INTEGER NOT NULL,
action VARCHAR(10) NOT NULL,
old_data JSONB,
new_data JSONB,
changed_fields TEXT[],
user_id INTEGER,
created_at TIMESTAMP DEFAULT NOW(),
PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);
-- Create monthly partitions
CREATE TABLE audit_log_2024_01 PARTITION OF audit_log
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE audit_log_2024_02 PARTITION OF audit_log
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- etc.
Trigger-Based Auditing¶
Generic Audit Trigger¶
CREATE OR REPLACE FUNCTION audit_trigger_func()
RETURNS TRIGGER AS $$
DECLARE
old_data JSONB;
new_data JSONB;
changed_fields TEXT[];
user_id_val INTEGER;
key TEXT;
BEGIN
-- Get user_id from session variable (set by application)
BEGIN
user_id_val := current_setting('app.current_user_id')::INTEGER;
EXCEPTION WHEN OTHERS THEN
user_id_val := NULL;
END;
IF TG_OP = 'INSERT' THEN
new_data := to_jsonb(NEW);
INSERT INTO audit_log (table_name, record_id, action, new_data, user_id)
VALUES (TG_TABLE_NAME, NEW.id, 'INSERT', new_data, user_id_val);
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
old_data := to_jsonb(OLD);
new_data := to_jsonb(NEW);
-- Find changed fields
SELECT array_agg(key)
INTO changed_fields
FROM (
SELECT key
FROM jsonb_each(old_data) AS o(key, value)
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_each(new_data) AS n
WHERE n.key = o.key AND n.value = o.value
)
UNION
SELECT key
FROM jsonb_each(new_data) AS n(key, value)
WHERE NOT EXISTS (
SELECT 1 FROM jsonb_each(old_data) AS o
WHERE o.key = n.key AND o.value = n.value
)
) t;
IF changed_fields IS NOT NULL THEN
INSERT INTO audit_log (table_name, record_id, action, old_data, new_data, changed_fields, user_id)
VALUES (TG_TABLE_NAME, NEW.id, 'UPDATE', old_data, new_data, changed_fields, user_id_val);
END IF;
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
old_data := to_jsonb(OLD);
INSERT INTO audit_log (table_name, record_id, action, old_data, user_id)
VALUES (TG_TABLE_NAME, OLD.id, 'DELETE', old_data, user_id_val);
RETURN OLD;
END IF;
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
Apply Trigger to Tables¶
-- Apply to users table
CREATE TRIGGER users_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();
-- Apply to bookings table
CREATE TRIGGER bookings_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON bookings
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();
Exclude Fields from Audit¶
CREATE OR REPLACE FUNCTION audit_trigger_func()
RETURNS TRIGGER AS $$
DECLARE
excluded_fields TEXT[] := ARRAY['updated_at', 'last_login', 'password_hash'];
-- ... rest of function
BEGIN
-- Remove excluded fields from old_data and new_data
old_data := old_data - excluded_fields;
new_data := new_data - excluded_fields;
-- ... rest of logic
END;
$$ LANGUAGE plpgsql;
Application-Level Auditing¶
SQLAlchemy Event-Based¶
from sqlalchemy import event
from sqlalchemy.orm import Session
from datetime import datetime
def get_changes(obj) -> dict:
"""Get changed attributes on an object."""
changes = {}
for attr in obj.__mapper__.column_attrs:
hist = attr.history
if hist.has_changes():
changes[attr.key] = {
"old": hist.deleted[0] if hist.deleted else None,
"new": hist.added[0] if hist.added else None,
}
return changes
@event.listens_for(Session, "before_flush")
def audit_changes(session, flush_context, instances):
"""Create audit records for changes."""
user_id = getattr(session, "current_user_id", None)
for obj in session.new:
if hasattr(obj, "__audit__") and obj.__audit__:
audit = AuditLog(
table_name=obj.__tablename__,
record_id=obj.id,
action="INSERT",
new_data=obj.to_dict(),
user_id=user_id,
)
session.add(audit)
for obj in session.dirty:
if hasattr(obj, "__audit__") and obj.__audit__:
changes = get_changes(obj)
if changes:
audit = AuditLog(
table_name=obj.__tablename__,
record_id=obj.id,
action="UPDATE",
old_data={k: v["old"] for k, v in changes.items()},
new_data={k: v["new"] for k, v in changes.items()},
changed_fields=list(changes.keys()),
user_id=user_id,
)
session.add(audit)
for obj in session.deleted:
if hasattr(obj, "__audit__") and obj.__audit__:
audit = AuditLog(
table_name=obj.__tablename__,
record_id=obj.id,
action="DELETE",
old_data=obj.to_dict(),
user_id=user_id,
)
session.add(audit)
Mark Models for Auditing¶
class AuditMixin:
"""Mixin to enable auditing on a model."""
__audit__ = True
def to_dict(self) -> dict:
"""Convert model to dictionary for audit logging."""
return {
c.name: getattr(self, c.name)
for c in self.__table__.columns
}
class User(Base, AuditMixin):
__tablename__ = "users"
# ... columns
Passing User Context¶
Set Session Variable¶
from sqlalchemy import text
async def get_db_with_user(
request: Request,
db: AsyncSession = Depends(get_db)
) -> AsyncGenerator[AsyncSession, None]:
"""Database session with user context for auditing."""
user = request.state.user
if user:
await db.execute(
text(f"SET LOCAL app.current_user_id = '{user.id}'")
)
db.current_user_id = user.id
yield db
FastAPI Middleware¶
from starlette.middleware.base import BaseHTTPMiddleware
class AuditContextMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
# Store request info for audit
request.state.audit_context = {
"ip_address": request.client.host,
"user_agent": request.headers.get("user-agent"),
"request_id": request.headers.get("x-request-id"),
}
return await call_next(request)
Querying Audit Data¶
Record History¶
-- Full history of a user
SELECT
action,
changed_fields,
old_data,
new_data,
user_id,
created_at
FROM audit_log
WHERE table_name = 'users'
AND record_id = 123
ORDER BY created_at;
Who Changed What¶
-- Changes by a specific user
SELECT
table_name,
record_id,
action,
changed_fields,
created_at
FROM audit_log
WHERE user_id = 456
ORDER BY created_at DESC
LIMIT 100;
Recent Activity¶
-- Activity in last hour
SELECT
table_name,
action,
COUNT(*) as changes,
COUNT(DISTINCT user_id) as users
FROM audit_log
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY table_name, action
ORDER BY changes DESC;
Reconstruct State at Point in Time¶
-- Get user state at specific time
WITH history AS (
SELECT
new_data,
created_at,
ROW_NUMBER() OVER (ORDER BY created_at DESC) as rn
FROM audit_log
WHERE table_name = 'users'
AND record_id = 123
AND created_at <= '2024-01-15 14:00:00'
)
SELECT new_data
FROM history
WHERE rn = 1;
Audit Log Retention¶
Archive Old Records¶
async def archive_old_audit_logs(db: AsyncSession, days: int = 90):
"""Move old audit logs to archive table."""
cutoff = datetime.utcnow() - timedelta(days=days)
# Insert into archive
await db.execute(text("""
INSERT INTO audit_log_archive
SELECT * FROM audit_log
WHERE created_at < :cutoff
"""), {"cutoff": cutoff})
# Delete from main table
await db.execute(text("""
DELETE FROM audit_log
WHERE created_at < :cutoff
"""), {"cutoff": cutoff})
await db.commit()
Summarize Before Deletion¶
-- Create summary before deleting old records
INSERT INTO audit_summary (table_name, month, insert_count, update_count, delete_count)
SELECT
table_name,
DATE_TRUNC('month', created_at) as month,
COUNT(*) FILTER (WHERE action = 'INSERT'),
COUNT(*) FILTER (WHERE action = 'UPDATE'),
COUNT(*) FILTER (WHERE action = 'DELETE')
FROM audit_log
WHERE created_at < NOW() - INTERVAL '1 year'
GROUP BY table_name, DATE_TRUNC('month', created_at);
Best Practices¶
- Audit only what matters — Not every table needs auditing
- Exclude sensitive fields — Don't log passwords or tokens
- Include user context — Who made the change
- Partition large tables — For performance and retention
- Index for common queries — table_name + record_id, created_at
- Set retention policy — Archive or delete old records
- Monitor table size — Audit tables grow fast
- Test restore from audit — Can you actually use the data?