Skip to content

Audit Trails

Track all changes to database records for compliance and debugging.

Audit Table Design

Basic Audit Table

CREATE TABLE audit_log (
    id BIGSERIAL PRIMARY KEY,
    table_name VARCHAR(100) NOT NULL,
    record_id INTEGER NOT NULL,
    action VARCHAR(10) NOT NULL,  -- INSERT, UPDATE, DELETE
    old_data JSONB,
    new_data JSONB,
    changed_fields TEXT[],
    user_id INTEGER,
    user_email VARCHAR(255),
    ip_address INET,
    user_agent TEXT,
    created_at TIMESTAMP DEFAULT NOW()
);

-- Indexes for common queries
CREATE INDEX idx_audit_log_table_record ON audit_log(table_name, record_id);
CREATE INDEX idx_audit_log_created_at ON audit_log(created_at);
CREATE INDEX idx_audit_log_user_id ON audit_log(user_id);
CREATE INDEX idx_audit_log_action ON audit_log(action);

Partitioned Audit Table (for high volume)

CREATE TABLE audit_log (
    id BIGSERIAL,
    table_name VARCHAR(100) NOT NULL,
    record_id INTEGER NOT NULL,
    action VARCHAR(10) NOT NULL,
    old_data JSONB,
    new_data JSONB,
    changed_fields TEXT[],
    user_id INTEGER,
    created_at TIMESTAMP DEFAULT NOW(),
    PRIMARY KEY (id, created_at)
) PARTITION BY RANGE (created_at);

-- Create monthly partitions
CREATE TABLE audit_log_2024_01 PARTITION OF audit_log
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE audit_log_2024_02 PARTITION OF audit_log
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
-- etc.

Trigger-Based Auditing

Generic Audit Trigger

CREATE OR REPLACE FUNCTION audit_trigger_func()
RETURNS TRIGGER AS $$
DECLARE
    old_data JSONB;
    new_data JSONB;
    changed_fields TEXT[];
    user_id_val INTEGER;
    key TEXT;
BEGIN
    -- Get user_id from session variable (set by application)
    BEGIN
        user_id_val := current_setting('app.current_user_id')::INTEGER;
    EXCEPTION WHEN OTHERS THEN
        user_id_val := NULL;
    END;

    IF TG_OP = 'INSERT' THEN
        new_data := to_jsonb(NEW);
        INSERT INTO audit_log (table_name, record_id, action, new_data, user_id)
        VALUES (TG_TABLE_NAME, NEW.id, 'INSERT', new_data, user_id_val);
        RETURN NEW;

    ELSIF TG_OP = 'UPDATE' THEN
        old_data := to_jsonb(OLD);
        new_data := to_jsonb(NEW);

        -- Find changed fields
        SELECT array_agg(key)
        INTO changed_fields
        FROM (
            SELECT key
            FROM jsonb_each(old_data) AS o(key, value)
            WHERE NOT EXISTS (
                SELECT 1 FROM jsonb_each(new_data) AS n
                WHERE n.key = o.key AND n.value = o.value
            )
            UNION
            SELECT key
            FROM jsonb_each(new_data) AS n(key, value)
            WHERE NOT EXISTS (
                SELECT 1 FROM jsonb_each(old_data) AS o
                WHERE o.key = n.key AND o.value = n.value
            )
        ) t;

        IF changed_fields IS NOT NULL THEN
            INSERT INTO audit_log (table_name, record_id, action, old_data, new_data, changed_fields, user_id)
            VALUES (TG_TABLE_NAME, NEW.id, 'UPDATE', old_data, new_data, changed_fields, user_id_val);
        END IF;
        RETURN NEW;

    ELSIF TG_OP = 'DELETE' THEN
        old_data := to_jsonb(OLD);
        INSERT INTO audit_log (table_name, record_id, action, old_data, user_id)
        VALUES (TG_TABLE_NAME, OLD.id, 'DELETE', old_data, user_id_val);
        RETURN OLD;
    END IF;

    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

Apply Trigger to Tables

-- Apply to users table
CREATE TRIGGER users_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();

-- Apply to bookings table
CREATE TRIGGER bookings_audit_trigger
AFTER INSERT OR UPDATE OR DELETE ON bookings
FOR EACH ROW EXECUTE FUNCTION audit_trigger_func();

Exclude Fields from Audit

CREATE OR REPLACE FUNCTION audit_trigger_func()
RETURNS TRIGGER AS $$
DECLARE
    excluded_fields TEXT[] := ARRAY['updated_at', 'last_login', 'password_hash'];
    -- ... rest of function
BEGIN
    -- Remove excluded fields from old_data and new_data
    old_data := old_data - excluded_fields;
    new_data := new_data - excluded_fields;
    -- ... rest of logic
END;
$$ LANGUAGE plpgsql;

Application-Level Auditing

SQLAlchemy Event-Based

from sqlalchemy import event
from sqlalchemy.orm import Session
from datetime import datetime

def get_changes(obj) -> dict:
    """Get changed attributes on an object."""
    changes = {}
    for attr in obj.__mapper__.column_attrs:
        hist = attr.history
        if hist.has_changes():
            changes[attr.key] = {
                "old": hist.deleted[0] if hist.deleted else None,
                "new": hist.added[0] if hist.added else None,
            }
    return changes


@event.listens_for(Session, "before_flush")
def audit_changes(session, flush_context, instances):
    """Create audit records for changes."""
    user_id = getattr(session, "current_user_id", None)

    for obj in session.new:
        if hasattr(obj, "__audit__") and obj.__audit__:
            audit = AuditLog(
                table_name=obj.__tablename__,
                record_id=obj.id,
                action="INSERT",
                new_data=obj.to_dict(),
                user_id=user_id,
            )
            session.add(audit)

    for obj in session.dirty:
        if hasattr(obj, "__audit__") and obj.__audit__:
            changes = get_changes(obj)
            if changes:
                audit = AuditLog(
                    table_name=obj.__tablename__,
                    record_id=obj.id,
                    action="UPDATE",
                    old_data={k: v["old"] for k, v in changes.items()},
                    new_data={k: v["new"] for k, v in changes.items()},
                    changed_fields=list(changes.keys()),
                    user_id=user_id,
                )
                session.add(audit)

    for obj in session.deleted:
        if hasattr(obj, "__audit__") and obj.__audit__:
            audit = AuditLog(
                table_name=obj.__tablename__,
                record_id=obj.id,
                action="DELETE",
                old_data=obj.to_dict(),
                user_id=user_id,
            )
            session.add(audit)

Mark Models for Auditing

class AuditMixin:
    """Mixin to enable auditing on a model."""
    __audit__ = True

    def to_dict(self) -> dict:
        """Convert model to dictionary for audit logging."""
        return {
            c.name: getattr(self, c.name)
            for c in self.__table__.columns
        }


class User(Base, AuditMixin):
    __tablename__ = "users"
    # ... columns

Passing User Context

Set Session Variable

from sqlalchemy import text

async def get_db_with_user(
    request: Request,
    db: AsyncSession = Depends(get_db)
) -> AsyncGenerator[AsyncSession, None]:
    """Database session with user context for auditing."""
    user = request.state.user

    if user:
        await db.execute(
            text(f"SET LOCAL app.current_user_id = '{user.id}'")
        )
        db.current_user_id = user.id

    yield db

FastAPI Middleware

from starlette.middleware.base import BaseHTTPMiddleware

class AuditContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        # Store request info for audit
        request.state.audit_context = {
            "ip_address": request.client.host,
            "user_agent": request.headers.get("user-agent"),
            "request_id": request.headers.get("x-request-id"),
        }
        return await call_next(request)

Querying Audit Data

Record History

-- Full history of a user
SELECT
    action,
    changed_fields,
    old_data,
    new_data,
    user_id,
    created_at
FROM audit_log
WHERE table_name = 'users'
  AND record_id = 123
ORDER BY created_at;

Who Changed What

-- Changes by a specific user
SELECT
    table_name,
    record_id,
    action,
    changed_fields,
    created_at
FROM audit_log
WHERE user_id = 456
ORDER BY created_at DESC
LIMIT 100;

Recent Activity

-- Activity in last hour
SELECT
    table_name,
    action,
    COUNT(*) as changes,
    COUNT(DISTINCT user_id) as users
FROM audit_log
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY table_name, action
ORDER BY changes DESC;

Reconstruct State at Point in Time

-- Get user state at specific time
WITH history AS (
    SELECT
        new_data,
        created_at,
        ROW_NUMBER() OVER (ORDER BY created_at DESC) as rn
    FROM audit_log
    WHERE table_name = 'users'
      AND record_id = 123
      AND created_at <= '2024-01-15 14:00:00'
)
SELECT new_data
FROM history
WHERE rn = 1;

Audit Log Retention

Archive Old Records

async def archive_old_audit_logs(db: AsyncSession, days: int = 90):
    """Move old audit logs to archive table."""
    cutoff = datetime.utcnow() - timedelta(days=days)

    # Insert into archive
    await db.execute(text("""
        INSERT INTO audit_log_archive
        SELECT * FROM audit_log
        WHERE created_at < :cutoff
    """), {"cutoff": cutoff})

    # Delete from main table
    await db.execute(text("""
        DELETE FROM audit_log
        WHERE created_at < :cutoff
    """), {"cutoff": cutoff})

    await db.commit()

Summarize Before Deletion

-- Create summary before deleting old records
INSERT INTO audit_summary (table_name, month, insert_count, update_count, delete_count)
SELECT
    table_name,
    DATE_TRUNC('month', created_at) as month,
    COUNT(*) FILTER (WHERE action = 'INSERT'),
    COUNT(*) FILTER (WHERE action = 'UPDATE'),
    COUNT(*) FILTER (WHERE action = 'DELETE')
FROM audit_log
WHERE created_at < NOW() - INTERVAL '1 year'
GROUP BY table_name, DATE_TRUNC('month', created_at);

Best Practices

  1. Audit only what matters — Not every table needs auditing
  2. Exclude sensitive fields — Don't log passwords or tokens
  3. Include user context — Who made the change
  4. Partition large tables — For performance and retention
  5. Index for common queries — table_name + record_id, created_at
  6. Set retention policy — Archive or delete old records
  7. Monitor table size — Audit tables grow fast
  8. Test restore from audit — Can you actually use the data?