Authentication
Verify user identity securely.
Authentication Methods
| Method |
Use Case |
Stateless |
Notes |
| JWT |
APIs, SPAs |
Yes |
Short-lived tokens |
| Sessions |
Traditional web apps |
No |
Server-side state |
| API Keys |
Service-to-service |
Yes |
Long-lived, rotatable |
| OAuth |
Third-party login |
Depends |
Delegated auth |
JWT Authentication
Token Structure
Header.Payload.Signature
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9. ← Header (algorithm, type)
eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4ifQ. ← Payload (claims)
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c ← Signature
Secure JWT Implementation
from datetime import datetime, timedelta
from typing import Optional
import jwt
from pydantic import BaseModel
# Configuration
JWT_SECRET = settings.secret_key # From environment
JWT_ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
class TokenPayload(BaseModel):
sub: str # Subject (user ID)
exp: datetime # Expiration
iat: datetime # Issued at
jti: str # JWT ID (for revocation)
type: str # "access" or "refresh"
def create_access_token(user_id: str) -> str:
now = datetime.utcnow()
payload = {
"sub": user_id,
"exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
"iat": now,
"jti": str(uuid.uuid4()),
"type": "access",
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def create_refresh_token(user_id: str) -> str:
now = datetime.utcnow()
payload = {
"sub": user_id,
"exp": now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
"iat": now,
"jti": str(uuid.uuid4()),
"type": "refresh",
}
return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
def verify_token(token: str, expected_type: str = "access") -> TokenPayload:
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
if payload.get("type") != expected_type:
raise ValueError("Invalid token type")
return TokenPayload(**payload)
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
FastAPI JWT Dependency
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db),
) -> User:
token = credentials.credentials
try:
payload = verify_token(token, expected_type="access")
except HTTPException:
raise
except Exception:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user = await db.get(User, payload.sub)
if not user:
raise HTTPException(401, "User not found")
if not user.is_active:
raise HTTPException(401, "User deactivated")
return user
# Usage
@app.get("/me")
async def get_me(user: User = Depends(get_current_user)):
return user
Token Refresh Flow
@app.post("/auth/refresh")
async def refresh_tokens(
refresh_token: str,
db: AsyncSession = Depends(get_db),
):
# Verify refresh token
payload = verify_token(refresh_token, expected_type="refresh")
# Check if token is revoked
if await is_token_revoked(payload.jti):
raise HTTPException(401, "Token revoked")
# Get user
user = await db.get(User, payload.sub)
if not user or not user.is_active:
raise HTTPException(401, "Invalid user")
# Revoke old refresh token (rotation)
await revoke_token(payload.jti)
# Issue new tokens
return {
"access_token": create_access_token(user.id),
"refresh_token": create_refresh_token(user.id),
"token_type": "bearer",
}
Token Revocation
from redis import Redis
redis = Redis()
async def revoke_token(jti: str, expires_in: int = 86400 * 7):
"""Add token to revocation list."""
await redis.setex(f"revoked:{jti}", expires_in, "1")
async def is_token_revoked(jti: str) -> bool:
"""Check if token is revoked."""
return await redis.exists(f"revoked:{jti}")
# Revoke all user tokens (e.g., on password change)
async def revoke_all_user_tokens(user_id: str):
"""Increment user's token version to invalidate all tokens."""
await redis.incr(f"token_version:{user_id}")
Session Authentication
Secure Session Setup
from starlette.middleware.sessions import SessionMiddleware
from itsdangerous import URLSafeTimedSerializer
app.add_middleware(
SessionMiddleware,
secret_key=settings.secret_key,
session_cookie="session",
max_age=3600, # 1 hour
same_site="lax",
https_only=True, # Production only
)
# Session storage with Redis
from fastapi_sessions.backends.implementations import InMemoryBackend
from fastapi_sessions.session_verifier import SessionVerifier
from fastapi_sessions.frontends.implementations import SessionCookie, CookieParameters
cookie_params = CookieParameters(
httponly=True,
secure=True, # HTTPS only
samesite="lax",
)
cookie = SessionCookie(
cookie_name="session",
identifier="general_verifier",
auto_error=True,
secret_key=settings.secret_key,
cookie_params=cookie_params,
)
Password Security
Hashing with Argon2
from passlib.context import CryptContext
# Argon2 is the recommended algorithm
pwd_context = CryptContext(
schemes=["argon2"],
deprecated="auto",
argon2__memory_cost=65536, # 64 MB
argon2__time_cost=3,
argon2__parallelism=4,
)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
# Check if rehash needed (algorithm upgrade)
def needs_rehash(hashed_password: str) -> bool:
return pwd_context.needs_update(hashed_password)
Password Requirements
from pydantic import BaseModel, validator
import re
class PasswordPolicy:
MIN_LENGTH = 12
REQUIRE_UPPERCASE = True
REQUIRE_LOWERCASE = True
REQUIRE_DIGIT = True
REQUIRE_SPECIAL = True
@classmethod
def validate(cls, password: str) -> list[str]:
errors = []
if len(password) < cls.MIN_LENGTH:
errors.append(f"Password must be at least {cls.MIN_LENGTH} characters")
if cls.REQUIRE_UPPERCASE and not re.search(r"[A-Z]", password):
errors.append("Password must contain an uppercase letter")
if cls.REQUIRE_LOWERCASE and not re.search(r"[a-z]", password):
errors.append("Password must contain a lowercase letter")
if cls.REQUIRE_DIGIT and not re.search(r"\d", password):
errors.append("Password must contain a digit")
if cls.REQUIRE_SPECIAL and not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
errors.append("Password must contain a special character")
return errors
Breach Detection
import hashlib
import httpx
async def is_password_breached(password: str) -> bool:
"""Check password against Have I Been Pwned."""
sha1 = hashlib.sha1(password.encode()).hexdigest().upper()
prefix, suffix = sha1[:5], sha1[5:]
async with httpx.AsyncClient() as client:
response = await client.get(
f"https://api.pwnedpasswords.com/range/{prefix}"
)
# Check if our suffix is in the response
for line in response.text.splitlines():
hash_suffix, count = line.split(":")
if hash_suffix == suffix:
return True
return False
Multi-Factor Authentication
TOTP Setup
import pyotp
import qrcode
from io import BytesIO
def generate_totp_secret() -> str:
return pyotp.random_base32()
def get_totp_uri(secret: str, email: str, issuer: str = "MyApp") -> str:
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(email, issuer_name=issuer)
def generate_qr_code(uri: str) -> bytes:
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = BytesIO()
img.save(buffer, format="PNG")
return buffer.getvalue()
def verify_totp(secret: str, code: str) -> bool:
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=1) # Allow 30s clock skew
MFA Flow
@app.post("/auth/mfa/setup")
async def setup_mfa(user: User = Depends(get_current_user)):
# Generate secret
secret = generate_totp_secret()
# Store temporarily (not activated yet)
await cache.set(f"mfa_setup:{user.id}", secret, ex=600)
# Generate QR code
uri = get_totp_uri(secret, user.email)
qr_code = generate_qr_code(uri)
return {
"secret": secret, # For manual entry
"qr_code": base64.b64encode(qr_code).decode(),
}
@app.post("/auth/mfa/verify")
async def verify_mfa_setup(
code: str,
user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
# Get pending secret
secret = await cache.get(f"mfa_setup:{user.id}")
if not secret:
raise HTTPException(400, "No MFA setup in progress")
# Verify code
if not verify_totp(secret, code):
raise HTTPException(400, "Invalid code")
# Activate MFA
user.mfa_secret = secret
user.mfa_enabled = True
await db.commit()
# Generate backup codes
backup_codes = [secrets.token_hex(4) for _ in range(10)]
await store_backup_codes(user.id, backup_codes)
return {"backup_codes": backup_codes}
Login Security
Brute Force Protection
from redis import Redis
redis = Redis()
class LoginRateLimiter:
MAX_ATTEMPTS = 5
LOCKOUT_SECONDS = 900 # 15 minutes
@classmethod
async def check_and_increment(cls, identifier: str) -> bool:
"""Returns True if login should be blocked."""
key = f"login_attempts:{identifier}"
attempts = await redis.incr(key)
if attempts == 1:
await redis.expire(key, cls.LOCKOUT_SECONDS)
return attempts > cls.MAX_ATTEMPTS
@classmethod
async def reset(cls, identifier: str):
"""Reset attempts on successful login."""
await redis.delete(f"login_attempts:{identifier}")
@app.post("/auth/login")
async def login(credentials: LoginCredentials, request: Request):
identifier = f"{credentials.email}:{request.client.host}"
# Check rate limit
if await LoginRateLimiter.check_and_increment(identifier):
raise HTTPException(429, "Too many login attempts. Try again later.")
# Verify credentials
user = await authenticate_user(credentials.email, credentials.password)
if not user:
raise HTTPException(401, "Invalid credentials")
# Reset rate limit on success
await LoginRateLimiter.reset(identifier)
return {"access_token": create_access_token(user.id)}
Account Lockout
async def handle_failed_login(user: User, db: AsyncSession):
user.failed_login_attempts += 1
user.last_failed_login = datetime.utcnow()
if user.failed_login_attempts >= 5:
user.locked_until = datetime.utcnow() + timedelta(minutes=15)
# Notify user
await send_account_locked_email(user.email)
await db.commit()
async def check_account_locked(user: User) -> bool:
if user.locked_until and user.locked_until > datetime.utcnow():
return True
return False
@app.middleware("http")
async def auth_security_headers(request: Request, call_next):
response = await call_next(request)
# Prevent caching of authenticated responses
if request.headers.get("Authorization"):
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
response.headers["Pragma"] = "no-cache"
return response
Logout
@app.post("/auth/logout")
async def logout(
credentials: HTTPAuthorizationCredentials = Depends(security),
):
# Verify token
payload = verify_token(credentials.credentials)
# Revoke token
await revoke_token(payload.jti)
return {"message": "Logged out successfully"}
@app.post("/auth/logout-all")
async def logout_all_devices(
user: User = Depends(get_current_user),
):
"""Invalidate all user sessions."""
await revoke_all_user_tokens(user.id)
return {"message": "Logged out from all devices"}
Best Practices Summary
| Practice |
Implementation |
| Short-lived access tokens |
15 minutes max |
| Secure refresh tokens |
HTTP-only cookies, rotation |
| Strong password hashing |
Argon2id |
| MFA support |
TOTP with backup codes |
| Brute force protection |
Rate limiting per IP + email |
| Account lockout |
After N failed attempts |
| Token revocation |
Redis blacklist |
| Secure headers |
No-cache for auth responses |
See Also