Authorization¶
Control what authenticated users can access.
Authentication vs Authorization¶
Authentication: "Who are you?"
Authorization: "What can you do?"
┌─────────────────────────────────────┐
│ Request arrives │
├─────────────────────────────────────┤
│ 1. Authentication │
│ - Verify token/session │
│ - Identify user │
├─────────────────────────────────────┤
│ 2. Authorization │
│ - Check permissions │
│ - Verify resource access │
├─────────────────────────────────────┤
│ 3. Execute action │
└─────────────────────────────────────┘
Authorization Models¶
Role-Based Access Control (RBAC)¶
Users are assigned roles, roles have permissions.
from enum import Enum
from typing import Set
class Permission(str, Enum):
# Resource permissions
READ_USERS = "users:read"
WRITE_USERS = "users:write"
DELETE_USERS = "users:delete"
READ_POSTS = "posts:read"
WRITE_POSTS = "posts:write"
DELETE_POSTS = "posts:delete"
# Admin permissions
MANAGE_ROLES = "roles:manage"
VIEW_ANALYTICS = "analytics:view"
class Role(str, Enum):
ADMIN = "admin"
MODERATOR = "moderator"
EDITOR = "editor"
USER = "user"
ROLE_PERMISSIONS: dict[Role, Set[Permission]] = {
Role.ADMIN: {p for p in Permission}, # All permissions
Role.MODERATOR: {
Permission.READ_USERS,
Permission.READ_POSTS,
Permission.WRITE_POSTS,
Permission.DELETE_POSTS,
},
Role.EDITOR: {
Permission.READ_POSTS,
Permission.WRITE_POSTS,
},
Role.USER: {
Permission.READ_POSTS,
},
}
def has_permission(user_role: Role, permission: Permission) -> bool:
return permission in ROLE_PERMISSIONS.get(user_role, set())
Attribute-Based Access Control (ABAC)¶
Decisions based on attributes of user, resource, and environment.
from dataclasses import dataclass
from datetime import datetime, time
from typing import Any
@dataclass
class AccessContext:
user: "User"
resource: Any
action: str
environment: dict
class Policy:
def evaluate(self, context: AccessContext) -> bool:
raise NotImplementedError
class OwnerPolicy(Policy):
"""User can access their own resources."""
def evaluate(self, context: AccessContext) -> bool:
return context.resource.owner_id == context.user.id
class DepartmentPolicy(Policy):
"""User can access resources in their department."""
def evaluate(self, context: AccessContext) -> bool:
return context.resource.department == context.user.department
class BusinessHoursPolicy(Policy):
"""Access only during business hours."""
def evaluate(self, context: AccessContext) -> bool:
now = datetime.now().time()
return time(9, 0) <= now <= time(17, 0)
class PolicyEngine:
def __init__(self, policies: list[Policy]):
self.policies = policies
def is_allowed(self, context: AccessContext) -> bool:
return all(policy.evaluate(context) for policy in self.policies)
# Usage
engine = PolicyEngine([
OwnerPolicy(),
BusinessHoursPolicy(),
])
context = AccessContext(
user=current_user,
resource=document,
action="edit",
environment={"ip": request.client.host},
)
if not engine.is_allowed(context):
raise HTTPException(403, "Access denied")
FastAPI Authorization¶
Permission Dependency¶
from fastapi import Depends, HTTPException, status
from functools import wraps
class PermissionChecker:
def __init__(self, required_permissions: list[Permission]):
self.required_permissions = required_permissions
def __call__(self, user: User = Depends(get_current_user)) -> User:
for permission in self.required_permissions:
if not has_permission(user.role, permission):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied: {permission.value}",
)
return user
# Usage
@app.get("/admin/users")
async def list_users(
user: User = Depends(PermissionChecker([Permission.READ_USERS])),
):
return await get_all_users()
@app.delete("/admin/users/{user_id}")
async def delete_user(
user_id: int,
user: User = Depends(PermissionChecker([Permission.DELETE_USERS])),
):
return await remove_user(user_id)
Resource-Level Authorization¶
async def get_post_with_access(
post_id: int,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
) -> Post:
post = await db.get(Post, post_id)
if not post:
raise HTTPException(404, "Post not found")
# Check access
if not can_access_post(user, post):
raise HTTPException(403, "Access denied")
return post
def can_access_post(user: User, post: Post) -> bool:
# Owner can always access
if post.author_id == user.id:
return True
# Published posts are public
if post.status == "published":
return True
# Admins can access all
if user.role == Role.ADMIN:
return True
# Team members can access drafts
if post.team_id in user.team_ids:
return True
return False
# Usage
@app.get("/posts/{post_id}")
async def get_post(post: Post = Depends(get_post_with_access)):
return post
Ownership Verification¶
async def verify_ownership(
resource_id: int,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""Generic ownership verification."""
resource = await db.get(Resource, resource_id)
if not resource:
raise HTTPException(404, "Resource not found")
if resource.owner_id != user.id and user.role != Role.ADMIN:
raise HTTPException(403, "You don't own this resource")
return resource
@app.put("/resources/{resource_id}")
async def update_resource(
data: ResourceUpdate,
resource: Resource = Depends(verify_ownership),
):
# User is verified to own this resource
return await update_resource(resource, data)
Database-Level Authorization¶
Row-Level Security (PostgreSQL)¶
-- Enable RLS
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
-- Policy: Users can only see their own documents
CREATE POLICY user_documents ON documents
FOR ALL
USING (owner_id = current_setting('app.current_user_id')::int);
-- Policy: Admins can see all documents
CREATE POLICY admin_documents ON documents
FOR ALL
USING (
EXISTS (
SELECT 1 FROM users
WHERE id = current_setting('app.current_user_id')::int
AND role = 'admin'
)
);
# Set user context for RLS
async def set_user_context(user_id: int, db: AsyncSession):
await db.execute(text(f"SET app.current_user_id = '{user_id}'"))
@app.middleware("http")
async def rls_middleware(request: Request, call_next):
user = await get_current_user_from_request(request)
if user:
async with get_db() as db:
await set_user_context(user.id, db)
return await call_next(request)
Query Filtering¶
from sqlalchemy import select
from sqlalchemy.orm import Query
class AuthorizedQuery:
@staticmethod
def posts_for_user(user: User) -> Query:
"""Return query filtered to posts user can access."""
query = select(Post)
if user.role == Role.ADMIN:
return query # No filter for admins
return query.where(
or_(
Post.author_id == user.id, # Own posts
Post.status == "published", # Public posts
Post.team_id.in_(user.team_ids), # Team posts
)
)
# Usage
@app.get("/posts")
async def list_posts(
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
query = AuthorizedQuery.posts_for_user(user)
result = await db.execute(query)
return result.scalars().all()
Frontend Authorization¶
Next.js Middleware¶
// middleware.ts
import { NextResponse } from "next/server";
import type { NextRequest } from "next/server";
import { getToken } from "next-auth/jwt";
export async function middleware(request: NextRequest) {
const token = await getToken({ req: request });
// Protect admin routes
if (request.nextUrl.pathname.startsWith("/admin")) {
if (!token) {
return NextResponse.redirect(new URL("/login", request.url));
}
if (token.role !== "admin") {
return NextResponse.redirect(new URL("/unauthorized", request.url));
}
}
return NextResponse.next();
}
export const config = {
matcher: ["/admin/:path*", "/dashboard/:path*"],
};
Component-Level Authorization¶
// components/PermissionGate.tsx
"use client";
import { useSession } from "next-auth/react";
interface PermissionGateProps {
permissions: string[];
children: React.ReactNode;
fallback?: React.ReactNode;
}
export function PermissionGate({
permissions,
children,
fallback = null,
}: PermissionGateProps) {
const { data: session } = useSession();
if (!session) return fallback;
const hasPermission = permissions.every((p) =>
session.user.permissions?.includes(p)
);
return hasPermission ? <>{children}</> : <>{fallback}</>;
}
// Usage
<PermissionGate permissions={["users:delete"]}>
<DeleteUserButton userId={user.id} />
</PermissionGate>;
Authorization Patterns¶
Hierarchical Permissions¶
class PermissionHierarchy:
"""Permissions inherit from parent."""
HIERARCHY = {
"posts:delete": ["posts:write"],
"posts:write": ["posts:read"],
"users:delete": ["users:write"],
"users:write": ["users:read"],
}
@classmethod
def expand_permissions(cls, permissions: set[str]) -> set[str]:
"""Expand permissions to include implied ones."""
expanded = set(permissions)
for perm in permissions:
if perm in cls.HIERARCHY:
expanded.update(cls.HIERARCHY[perm])
# Recursively expand
expanded.update(cls.expand_permissions(set(cls.HIERARCHY[perm])))
return expanded
Contextual Permissions¶
def check_contextual_permission(
user: User,
resource: Any,
action: str,
) -> bool:
"""Check permission based on resource state."""
# Draft posts can only be edited by author
if isinstance(resource, Post):
if resource.status == "draft":
return resource.author_id == user.id
if resource.status == "published":
if action == "read":
return True
if action in ["edit", "delete"]:
return (
resource.author_id == user.id
or user.role in [Role.ADMIN, Role.MODERATOR]
)
return False
Temporary Elevated Access¶
from datetime import datetime, timedelta
import jwt
def create_elevated_token(user: User, permissions: list[str], duration: timedelta) -> str:
"""Create a short-lived token with elevated permissions."""
return jwt.encode(
{
"sub": user.id,
"elevated_permissions": permissions,
"exp": datetime.utcnow() + duration,
"type": "elevated",
},
settings.secret_key,
)
async def verify_elevated_permission(
token: str,
required_permission: str,
) -> bool:
"""Verify elevated access token."""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
if payload.get("type") != "elevated":
return False
return required_permission in payload.get("elevated_permissions", [])
except jwt.InvalidTokenError:
return False
Audit Logging¶
from datetime import datetime
import structlog
log = structlog.get_logger()
async def log_authorization_event(
user: User,
resource_type: str,
resource_id: str,
action: str,
allowed: bool,
reason: str = None,
):
log.info(
"authorization_check",
user_id=user.id,
user_role=user.role,
resource_type=resource_type,
resource_id=resource_id,
action=action,
allowed=allowed,
reason=reason,
timestamp=datetime.utcnow().isoformat(),
)
# Usage in authorization checks
async def authorize_action(user: User, post: Post, action: str) -> bool:
allowed = can_access_post(user, post, action)
await log_authorization_event(
user=user,
resource_type="post",
resource_id=str(post.id),
action=action,
allowed=allowed,
reason="owner" if post.author_id == user.id else "role",
)
return allowed
Best Practices Summary¶
| Practice | Implementation |
|---|---|
| Deny by default | Explicitly grant access, not deny |
| Check at every layer | API, service, database |
| Use least privilege | Minimal permissions needed |
| Audit all access | Log authorization decisions |
| Validate ownership | Verify user owns resource |
| Time-limit elevated access | Short-lived elevated tokens |
| Don't trust frontend | Always verify server-side |
| Test authorization | Unit tests for permission logic |