Skip to content

Authorization

Control what authenticated users can access.

Authentication vs Authorization

Authentication: "Who are you?"
Authorization:  "What can you do?"

┌─────────────────────────────────────┐
│         Request arrives             │
├─────────────────────────────────────┤
│ 1. Authentication                   │
│    - Verify token/session           │
│    - Identify user                  │
├─────────────────────────────────────┤
│ 2. Authorization                    │
│    - Check permissions              │
│    - Verify resource access         │
├─────────────────────────────────────┤
│ 3. Execute action                   │
└─────────────────────────────────────┘

Authorization Models

Role-Based Access Control (RBAC)

Users are assigned roles, roles have permissions.

from enum import Enum
from typing import Set

class Permission(str, Enum):
    # Resource permissions
    READ_USERS = "users:read"
    WRITE_USERS = "users:write"
    DELETE_USERS = "users:delete"

    READ_POSTS = "posts:read"
    WRITE_POSTS = "posts:write"
    DELETE_POSTS = "posts:delete"

    # Admin permissions
    MANAGE_ROLES = "roles:manage"
    VIEW_ANALYTICS = "analytics:view"

class Role(str, Enum):
    ADMIN = "admin"
    MODERATOR = "moderator"
    EDITOR = "editor"
    USER = "user"

ROLE_PERMISSIONS: dict[Role, Set[Permission]] = {
    Role.ADMIN: {p for p in Permission},  # All permissions
    Role.MODERATOR: {
        Permission.READ_USERS,
        Permission.READ_POSTS,
        Permission.WRITE_POSTS,
        Permission.DELETE_POSTS,
    },
    Role.EDITOR: {
        Permission.READ_POSTS,
        Permission.WRITE_POSTS,
    },
    Role.USER: {
        Permission.READ_POSTS,
    },
}

def has_permission(user_role: Role, permission: Permission) -> bool:
    return permission in ROLE_PERMISSIONS.get(user_role, set())

Attribute-Based Access Control (ABAC)

Decisions based on attributes of user, resource, and environment.

from dataclasses import dataclass
from datetime import datetime, time
from typing import Any

@dataclass
class AccessContext:
    user: "User"
    resource: Any
    action: str
    environment: dict

class Policy:
    def evaluate(self, context: AccessContext) -> bool:
        raise NotImplementedError

class OwnerPolicy(Policy):
    """User can access their own resources."""
    def evaluate(self, context: AccessContext) -> bool:
        return context.resource.owner_id == context.user.id

class DepartmentPolicy(Policy):
    """User can access resources in their department."""
    def evaluate(self, context: AccessContext) -> bool:
        return context.resource.department == context.user.department

class BusinessHoursPolicy(Policy):
    """Access only during business hours."""
    def evaluate(self, context: AccessContext) -> bool:
        now = datetime.now().time()
        return time(9, 0) <= now <= time(17, 0)

class PolicyEngine:
    def __init__(self, policies: list[Policy]):
        self.policies = policies

    def is_allowed(self, context: AccessContext) -> bool:
        return all(policy.evaluate(context) for policy in self.policies)

# Usage
engine = PolicyEngine([
    OwnerPolicy(),
    BusinessHoursPolicy(),
])

context = AccessContext(
    user=current_user,
    resource=document,
    action="edit",
    environment={"ip": request.client.host},
)

if not engine.is_allowed(context):
    raise HTTPException(403, "Access denied")

FastAPI Authorization

Permission Dependency

from fastapi import Depends, HTTPException, status
from functools import wraps

class PermissionChecker:
    def __init__(self, required_permissions: list[Permission]):
        self.required_permissions = required_permissions

    def __call__(self, user: User = Depends(get_current_user)) -> User:
        for permission in self.required_permissions:
            if not has_permission(user.role, permission):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Permission denied: {permission.value}",
                )
        return user

# Usage
@app.get("/admin/users")
async def list_users(
    user: User = Depends(PermissionChecker([Permission.READ_USERS])),
):
    return await get_all_users()

@app.delete("/admin/users/{user_id}")
async def delete_user(
    user_id: int,
    user: User = Depends(PermissionChecker([Permission.DELETE_USERS])),
):
    return await remove_user(user_id)

Resource-Level Authorization

async def get_post_with_access(
    post_id: int,
    user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
) -> Post:
    post = await db.get(Post, post_id)
    if not post:
        raise HTTPException(404, "Post not found")

    # Check access
    if not can_access_post(user, post):
        raise HTTPException(403, "Access denied")

    return post

def can_access_post(user: User, post: Post) -> bool:
    # Owner can always access
    if post.author_id == user.id:
        return True

    # Published posts are public
    if post.status == "published":
        return True

    # Admins can access all
    if user.role == Role.ADMIN:
        return True

    # Team members can access drafts
    if post.team_id in user.team_ids:
        return True

    return False

# Usage
@app.get("/posts/{post_id}")
async def get_post(post: Post = Depends(get_post_with_access)):
    return post

Ownership Verification

async def verify_ownership(
    resource_id: int,
    user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    """Generic ownership verification."""
    resource = await db.get(Resource, resource_id)
    if not resource:
        raise HTTPException(404, "Resource not found")

    if resource.owner_id != user.id and user.role != Role.ADMIN:
        raise HTTPException(403, "You don't own this resource")

    return resource

@app.put("/resources/{resource_id}")
async def update_resource(
    data: ResourceUpdate,
    resource: Resource = Depends(verify_ownership),
):
    # User is verified to own this resource
    return await update_resource(resource, data)

Database-Level Authorization

Row-Level Security (PostgreSQL)

-- Enable RLS
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- Policy: Users can only see their own documents
CREATE POLICY user_documents ON documents
    FOR ALL
    USING (owner_id = current_setting('app.current_user_id')::int);

-- Policy: Admins can see all documents
CREATE POLICY admin_documents ON documents
    FOR ALL
    USING (
        EXISTS (
            SELECT 1 FROM users
            WHERE id = current_setting('app.current_user_id')::int
            AND role = 'admin'
        )
    );
# Set user context for RLS
async def set_user_context(user_id: int, db: AsyncSession):
    await db.execute(text(f"SET app.current_user_id = '{user_id}'"))

@app.middleware("http")
async def rls_middleware(request: Request, call_next):
    user = await get_current_user_from_request(request)
    if user:
        async with get_db() as db:
            await set_user_context(user.id, db)
    return await call_next(request)

Query Filtering

from sqlalchemy import select
from sqlalchemy.orm import Query

class AuthorizedQuery:
    @staticmethod
    def posts_for_user(user: User) -> Query:
        """Return query filtered to posts user can access."""
        query = select(Post)

        if user.role == Role.ADMIN:
            return query  # No filter for admins

        return query.where(
            or_(
                Post.author_id == user.id,  # Own posts
                Post.status == "published",  # Public posts
                Post.team_id.in_(user.team_ids),  # Team posts
            )
        )

# Usage
@app.get("/posts")
async def list_posts(
    user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    query = AuthorizedQuery.posts_for_user(user)
    result = await db.execute(query)
    return result.scalars().all()

Frontend Authorization

Next.js Middleware

// middleware.ts
import { NextResponse } from "next/server";
import type { NextRequest } from "next/server";
import { getToken } from "next-auth/jwt";

export async function middleware(request: NextRequest) {
  const token = await getToken({ req: request });

  // Protect admin routes
  if (request.nextUrl.pathname.startsWith("/admin")) {
    if (!token) {
      return NextResponse.redirect(new URL("/login", request.url));
    }
    if (token.role !== "admin") {
      return NextResponse.redirect(new URL("/unauthorized", request.url));
    }
  }

  return NextResponse.next();
}

export const config = {
  matcher: ["/admin/:path*", "/dashboard/:path*"],
};

Component-Level Authorization

// components/PermissionGate.tsx
"use client";

import { useSession } from "next-auth/react";

interface PermissionGateProps {
  permissions: string[];
  children: React.ReactNode;
  fallback?: React.ReactNode;
}

export function PermissionGate({
  permissions,
  children,
  fallback = null,
}: PermissionGateProps) {
  const { data: session } = useSession();

  if (!session) return fallback;

  const hasPermission = permissions.every((p) =>
    session.user.permissions?.includes(p)
  );

  return hasPermission ? <>{children}</> : <>{fallback}</>;
}

// Usage
<PermissionGate permissions={["users:delete"]}>
  <DeleteUserButton userId={user.id} />
</PermissionGate>;

Authorization Patterns

Hierarchical Permissions

class PermissionHierarchy:
    """Permissions inherit from parent."""

    HIERARCHY = {
        "posts:delete": ["posts:write"],
        "posts:write": ["posts:read"],
        "users:delete": ["users:write"],
        "users:write": ["users:read"],
    }

    @classmethod
    def expand_permissions(cls, permissions: set[str]) -> set[str]:
        """Expand permissions to include implied ones."""
        expanded = set(permissions)

        for perm in permissions:
            if perm in cls.HIERARCHY:
                expanded.update(cls.HIERARCHY[perm])
                # Recursively expand
                expanded.update(cls.expand_permissions(set(cls.HIERARCHY[perm])))

        return expanded

Contextual Permissions

def check_contextual_permission(
    user: User,
    resource: Any,
    action: str,
) -> bool:
    """Check permission based on resource state."""

    # Draft posts can only be edited by author
    if isinstance(resource, Post):
        if resource.status == "draft":
            return resource.author_id == user.id

        if resource.status == "published":
            if action == "read":
                return True
            if action in ["edit", "delete"]:
                return (
                    resource.author_id == user.id
                    or user.role in [Role.ADMIN, Role.MODERATOR]
                )

    return False

Temporary Elevated Access

from datetime import datetime, timedelta
import jwt

def create_elevated_token(user: User, permissions: list[str], duration: timedelta) -> str:
    """Create a short-lived token with elevated permissions."""
    return jwt.encode(
        {
            "sub": user.id,
            "elevated_permissions": permissions,
            "exp": datetime.utcnow() + duration,
            "type": "elevated",
        },
        settings.secret_key,
    )

async def verify_elevated_permission(
    token: str,
    required_permission: str,
) -> bool:
    """Verify elevated access token."""
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
        if payload.get("type") != "elevated":
            return False
        return required_permission in payload.get("elevated_permissions", [])
    except jwt.InvalidTokenError:
        return False

Audit Logging

from datetime import datetime
import structlog

log = structlog.get_logger()

async def log_authorization_event(
    user: User,
    resource_type: str,
    resource_id: str,
    action: str,
    allowed: bool,
    reason: str = None,
):
    log.info(
        "authorization_check",
        user_id=user.id,
        user_role=user.role,
        resource_type=resource_type,
        resource_id=resource_id,
        action=action,
        allowed=allowed,
        reason=reason,
        timestamp=datetime.utcnow().isoformat(),
    )

# Usage in authorization checks
async def authorize_action(user: User, post: Post, action: str) -> bool:
    allowed = can_access_post(user, post, action)

    await log_authorization_event(
        user=user,
        resource_type="post",
        resource_id=str(post.id),
        action=action,
        allowed=allowed,
        reason="owner" if post.author_id == user.id else "role",
    )

    return allowed

Best Practices Summary

Practice Implementation
Deny by default Explicitly grant access, not deny
Check at every layer API, service, database
Use least privilege Minimal permissions needed
Audit all access Log authorization decisions
Validate ownership Verify user owns resource
Time-limit elevated access Short-lived elevated tokens
Don't trust frontend Always verify server-side
Test authorization Unit tests for permission logic