Skip to content

Authentication

Verify user identity securely.

Authentication Methods

Method Use Case Stateless Notes
JWT APIs, SPAs Yes Short-lived tokens
Sessions Traditional web apps No Server-side state
API Keys Service-to-service Yes Long-lived, rotatable
OAuth Third-party login Depends Delegated auth

JWT Authentication

Token Structure

Header.Payload.Signature

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.  ← Header (algorithm, type)
eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4ifQ.  ← Payload (claims)
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c  ← Signature

Secure JWT Implementation

from datetime import datetime, timedelta
from typing import Optional
import jwt
from pydantic import BaseModel

# Configuration
JWT_SECRET = settings.secret_key  # From environment
JWT_ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7

class TokenPayload(BaseModel):
    sub: str  # Subject (user ID)
    exp: datetime  # Expiration
    iat: datetime  # Issued at
    jti: str  # JWT ID (for revocation)
    type: str  # "access" or "refresh"

def create_access_token(user_id: str) -> str:
    now = datetime.utcnow()
    payload = {
        "sub": user_id,
        "exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
        "iat": now,
        "jti": str(uuid.uuid4()),
        "type": "access",
    }
    return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)

def create_refresh_token(user_id: str) -> str:
    now = datetime.utcnow()
    payload = {
        "sub": user_id,
        "exp": now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
        "iat": now,
        "jti": str(uuid.uuid4()),
        "type": "refresh",
    }
    return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)

def verify_token(token: str, expected_type: str = "access") -> TokenPayload:
    try:
        payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])

        if payload.get("type") != expected_type:
            raise ValueError("Invalid token type")

        return TokenPayload(**payload)

    except jwt.ExpiredSignatureError:
        raise HTTPException(401, "Token expired")
    except jwt.InvalidTokenError:
        raise HTTPException(401, "Invalid token")

FastAPI JWT Dependency

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: AsyncSession = Depends(get_db),
) -> User:
    token = credentials.credentials

    try:
        payload = verify_token(token, expected_type="access")
    except HTTPException:
        raise
    except Exception:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    user = await db.get(User, payload.sub)
    if not user:
        raise HTTPException(401, "User not found")

    if not user.is_active:
        raise HTTPException(401, "User deactivated")

    return user

# Usage
@app.get("/me")
async def get_me(user: User = Depends(get_current_user)):
    return user

Token Refresh Flow

@app.post("/auth/refresh")
async def refresh_tokens(
    refresh_token: str,
    db: AsyncSession = Depends(get_db),
):
    # Verify refresh token
    payload = verify_token(refresh_token, expected_type="refresh")

    # Check if token is revoked
    if await is_token_revoked(payload.jti):
        raise HTTPException(401, "Token revoked")

    # Get user
    user = await db.get(User, payload.sub)
    if not user or not user.is_active:
        raise HTTPException(401, "Invalid user")

    # Revoke old refresh token (rotation)
    await revoke_token(payload.jti)

    # Issue new tokens
    return {
        "access_token": create_access_token(user.id),
        "refresh_token": create_refresh_token(user.id),
        "token_type": "bearer",
    }

Token Revocation

from redis import Redis

redis = Redis()

async def revoke_token(jti: str, expires_in: int = 86400 * 7):
    """Add token to revocation list."""
    await redis.setex(f"revoked:{jti}", expires_in, "1")

async def is_token_revoked(jti: str) -> bool:
    """Check if token is revoked."""
    return await redis.exists(f"revoked:{jti}")

# Revoke all user tokens (e.g., on password change)
async def revoke_all_user_tokens(user_id: str):
    """Increment user's token version to invalidate all tokens."""
    await redis.incr(f"token_version:{user_id}")

Session Authentication

Secure Session Setup

from starlette.middleware.sessions import SessionMiddleware
from itsdangerous import URLSafeTimedSerializer

app.add_middleware(
    SessionMiddleware,
    secret_key=settings.secret_key,
    session_cookie="session",
    max_age=3600,  # 1 hour
    same_site="lax",
    https_only=True,  # Production only
)

# Session storage with Redis
from fastapi_sessions.backends.implementations import InMemoryBackend
from fastapi_sessions.session_verifier import SessionVerifier
from fastapi_sessions.frontends.implementations import SessionCookie, CookieParameters

cookie_params = CookieParameters(
    httponly=True,
    secure=True,  # HTTPS only
    samesite="lax",
)

cookie = SessionCookie(
    cookie_name="session",
    identifier="general_verifier",
    auto_error=True,
    secret_key=settings.secret_key,
    cookie_params=cookie_params,
)

Password Security

Hashing with Argon2

from passlib.context import CryptContext

# Argon2 is the recommended algorithm
pwd_context = CryptContext(
    schemes=["argon2"],
    deprecated="auto",
    argon2__memory_cost=65536,  # 64 MB
    argon2__time_cost=3,
    argon2__parallelism=4,
)

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

# Check if rehash needed (algorithm upgrade)
def needs_rehash(hashed_password: str) -> bool:
    return pwd_context.needs_update(hashed_password)

Password Requirements

from pydantic import BaseModel, validator
import re

class PasswordPolicy:
    MIN_LENGTH = 12
    REQUIRE_UPPERCASE = True
    REQUIRE_LOWERCASE = True
    REQUIRE_DIGIT = True
    REQUIRE_SPECIAL = True

    @classmethod
    def validate(cls, password: str) -> list[str]:
        errors = []

        if len(password) < cls.MIN_LENGTH:
            errors.append(f"Password must be at least {cls.MIN_LENGTH} characters")

        if cls.REQUIRE_UPPERCASE and not re.search(r"[A-Z]", password):
            errors.append("Password must contain an uppercase letter")

        if cls.REQUIRE_LOWERCASE and not re.search(r"[a-z]", password):
            errors.append("Password must contain a lowercase letter")

        if cls.REQUIRE_DIGIT and not re.search(r"\d", password):
            errors.append("Password must contain a digit")

        if cls.REQUIRE_SPECIAL and not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
            errors.append("Password must contain a special character")

        return errors

Breach Detection

import hashlib
import httpx

async def is_password_breached(password: str) -> bool:
    """Check password against Have I Been Pwned."""
    sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
    prefix, suffix = sha1[:5], sha1[5:]

    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://api.pwnedpasswords.com/range/{prefix}"
        )

    # Check if our suffix is in the response
    for line in response.text.splitlines():
        hash_suffix, count = line.split(":")
        if hash_suffix == suffix:
            return True

    return False

Multi-Factor Authentication

TOTP Setup

import pyotp
import qrcode
from io import BytesIO

def generate_totp_secret() -> str:
    return pyotp.random_base32()

def get_totp_uri(secret: str, email: str, issuer: str = "MyApp") -> str:
    totp = pyotp.TOTP(secret)
    return totp.provisioning_uri(email, issuer_name=issuer)

def generate_qr_code(uri: str) -> bytes:
    qr = qrcode.QRCode(version=1, box_size=10, border=5)
    qr.add_data(uri)
    qr.make(fit=True)

    img = qr.make_image(fill_color="black", back_color="white")
    buffer = BytesIO()
    img.save(buffer, format="PNG")
    return buffer.getvalue()

def verify_totp(secret: str, code: str) -> bool:
    totp = pyotp.TOTP(secret)
    return totp.verify(code, valid_window=1)  # Allow 30s clock skew

MFA Flow

@app.post("/auth/mfa/setup")
async def setup_mfa(user: User = Depends(get_current_user)):
    # Generate secret
    secret = generate_totp_secret()

    # Store temporarily (not activated yet)
    await cache.set(f"mfa_setup:{user.id}", secret, ex=600)

    # Generate QR code
    uri = get_totp_uri(secret, user.email)
    qr_code = generate_qr_code(uri)

    return {
        "secret": secret,  # For manual entry
        "qr_code": base64.b64encode(qr_code).decode(),
    }

@app.post("/auth/mfa/verify")
async def verify_mfa_setup(
    code: str,
    user: User = Depends(get_current_user),
    db: AsyncSession = Depends(get_db),
):
    # Get pending secret
    secret = await cache.get(f"mfa_setup:{user.id}")
    if not secret:
        raise HTTPException(400, "No MFA setup in progress")

    # Verify code
    if not verify_totp(secret, code):
        raise HTTPException(400, "Invalid code")

    # Activate MFA
    user.mfa_secret = secret
    user.mfa_enabled = True
    await db.commit()

    # Generate backup codes
    backup_codes = [secrets.token_hex(4) for _ in range(10)]
    await store_backup_codes(user.id, backup_codes)

    return {"backup_codes": backup_codes}

Login Security

Brute Force Protection

from redis import Redis

redis = Redis()

class LoginRateLimiter:
    MAX_ATTEMPTS = 5
    LOCKOUT_SECONDS = 900  # 15 minutes

    @classmethod
    async def check_and_increment(cls, identifier: str) -> bool:
        """Returns True if login should be blocked."""
        key = f"login_attempts:{identifier}"

        attempts = await redis.incr(key)
        if attempts == 1:
            await redis.expire(key, cls.LOCKOUT_SECONDS)

        return attempts > cls.MAX_ATTEMPTS

    @classmethod
    async def reset(cls, identifier: str):
        """Reset attempts on successful login."""
        await redis.delete(f"login_attempts:{identifier}")

@app.post("/auth/login")
async def login(credentials: LoginCredentials, request: Request):
    identifier = f"{credentials.email}:{request.client.host}"

    # Check rate limit
    if await LoginRateLimiter.check_and_increment(identifier):
        raise HTTPException(429, "Too many login attempts. Try again later.")

    # Verify credentials
    user = await authenticate_user(credentials.email, credentials.password)
    if not user:
        raise HTTPException(401, "Invalid credentials")

    # Reset rate limit on success
    await LoginRateLimiter.reset(identifier)

    return {"access_token": create_access_token(user.id)}

Account Lockout

async def handle_failed_login(user: User, db: AsyncSession):
    user.failed_login_attempts += 1
    user.last_failed_login = datetime.utcnow()

    if user.failed_login_attempts >= 5:
        user.locked_until = datetime.utcnow() + timedelta(minutes=15)
        # Notify user
        await send_account_locked_email(user.email)

    await db.commit()

async def check_account_locked(user: User) -> bool:
    if user.locked_until and user.locked_until > datetime.utcnow():
        return True
    return False

Security Headers

@app.middleware("http")
async def auth_security_headers(request: Request, call_next):
    response = await call_next(request)

    # Prevent caching of authenticated responses
    if request.headers.get("Authorization"):
        response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
        response.headers["Pragma"] = "no-cache"

    return response

Logout

@app.post("/auth/logout")
async def logout(
    credentials: HTTPAuthorizationCredentials = Depends(security),
):
    # Verify token
    payload = verify_token(credentials.credentials)

    # Revoke token
    await revoke_token(payload.jti)

    return {"message": "Logged out successfully"}

@app.post("/auth/logout-all")
async def logout_all_devices(
    user: User = Depends(get_current_user),
):
    """Invalidate all user sessions."""
    await revoke_all_user_tokens(user.id)

    return {"message": "Logged out from all devices"}

Best Practices Summary

Practice Implementation
Short-lived access tokens 15 minutes max
Secure refresh tokens HTTP-only cookies, rotation
Strong password hashing Argon2id
MFA support TOTP with backup codes
Brute force protection Rate limiting per IP + email
Account lockout After N failed attempts
Token revocation Redis blacklist
Secure headers No-cache for auth responses

See Also