OWASP Top 10¶
The most critical web application security risks and how to prevent them.
Overview¶
| # | Risk | Impact | Prevention |
|---|---|---|---|
| A01 | Broken Access Control | Unauthorized access | Authorization checks, deny by default |
| A02 | Cryptographic Failures | Data exposure | Strong encryption, proper key management |
| A03 | Injection | Data theft, system compromise | Parameterized queries, input validation |
| A04 | Insecure Design | Fundamental flaws | Threat modeling, secure design patterns |
| A05 | Security Misconfiguration | System compromise | Hardening, automation, auditing |
| A06 | Vulnerable Components | Various | Dependency scanning, updates |
| A07 | Auth Failures | Account takeover | Strong auth, MFA, rate limiting |
| A08 | Data Integrity Failures | Compromise via updates | Signed updates, integrity checks |
| A09 | Logging Failures | Undetected breaches | Comprehensive logging, monitoring |
| A10 | SSRF | Internal network access | URL validation, network segmentation |
A01: Broken Access Control¶
The Vulnerability¶
Users can access unauthorized data or perform unauthorized actions.
# VULNERABLE: No authorization check
@app.get("/api/users/{user_id}/documents")
async def get_user_documents(user_id: int):
return await db.query(Document).filter(Document.user_id == user_id).all()
# Any authenticated user can access any user's documents!
Prevention¶
# SECURE: Verify ownership
@app.get("/api/users/{user_id}/documents")
async def get_user_documents(
user_id: int,
current_user: User = Depends(get_current_user),
):
# Verify the user can only access their own documents
if user_id != current_user.id and current_user.role != Role.ADMIN:
raise HTTPException(403, "Access denied")
return await db.query(Document).filter(Document.user_id == user_id).all()
// SECURE: Server-side authorization in Next.js
export async function GET(request: Request, { params }: { params: { userId: string } }) {
const session = await getServerSession();
if (!session) {
return Response.json({ error: "Unauthorized" }, { status: 401 });
}
// Users can only access their own data
if (params.userId !== session.user.id && session.user.role !== "admin") {
return Response.json({ error: "Forbidden" }, { status: 403 });
}
const documents = await getDocuments(params.userId);
return Response.json(documents);
}
A02: Cryptographic Failures¶
The Vulnerability¶
Sensitive data exposed due to weak cryptography or improper handling.
# VULNERABLE: Storing passwords in plain text
user.password = request.password # Never do this!
# VULNERABLE: Weak hashing
import hashlib
user.password = hashlib.md5(request.password.encode()).hexdigest() # Weak!
# VULNERABLE: Hardcoded encryption key
SECRET_KEY = "hardcoded-secret-key-123" # In source code!
Prevention¶
# SECURE: Strong password hashing
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
# SECURE: Proper encryption
from cryptography.fernet import Fernet
import os
# Key from environment, not hardcoded
ENCRYPTION_KEY = os.environ["ENCRYPTION_KEY"]
fernet = Fernet(ENCRYPTION_KEY)
def encrypt_sensitive_data(data: str) -> bytes:
return fernet.encrypt(data.encode())
def decrypt_sensitive_data(encrypted: bytes) -> str:
return fernet.decrypt(encrypted).decode()
A03: Injection¶
SQL Injection¶
# VULNERABLE: String concatenation
user_input = "'; DROP TABLE users; --"
query = f"SELECT * FROM users WHERE name = '{user_input}'"
# Results in: SELECT * FROM users WHERE name = ''; DROP TABLE users; --'
# SECURE: Parameterized queries
from sqlalchemy import select, text
# Using ORM (automatically parameterized)
result = await session.execute(
select(User).where(User.name == user_input)
)
# Using raw SQL with parameters
result = await session.execute(
text("SELECT * FROM users WHERE name = :name"),
{"name": user_input}
)
Command Injection¶
import subprocess
# VULNERABLE: Shell injection
filename = "file.txt; rm -rf /"
subprocess.run(f"cat {filename}", shell=True) # Dangerous!
# SECURE: Avoid shell, use list arguments
subprocess.run(["cat", filename]) # Safe: no shell interpretation
NoSQL Injection¶
# VULNERABLE: Unvalidated MongoDB query
user_input = {"$gt": ""} # Matches all documents
result = db.users.find({"password": user_input})
# SECURE: Type validation
from pydantic import BaseModel, field_validator
class LoginRequest(BaseModel):
username: str
password: str
@field_validator("password")
@classmethod
def password_must_be_string(cls, v):
if not isinstance(v, str):
raise ValueError("Password must be a string")
return v
A04: Insecure Design¶
The Vulnerability¶
Design flaws that cannot be fixed by implementation changes alone.
Prevention: Threat Modeling¶
## Threat Model: User Registration
### Assets
- User credentials
- Email addresses
- Personal data
### Threats (STRIDE)
1. Spoofing: Fake registration with someone else's email
2. Tampering: Modify registration data in transit
3. Repudiation: Deny creating account
4. Info Disclosure: Expose other users' data
5. DoS: Mass fake registrations
6. Elevation: Register as admin
### Mitigations
1. Email verification required
2. HTTPS enforced
3. Audit logging
4. Rate limiting, CAPTCHA
5. Input validation, role assignment server-side
Secure Design Patterns¶
# Rate limiting by design
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@app.post("/api/register")
@limiter.limit("3/hour") # 3 registrations per IP per hour
async def register(request: Request, user: UserCreate):
pass
# Email verification by design
async def register_user(user_data: UserCreate):
user = await create_user(user_data, is_verified=False)
token = create_verification_token(user.id)
await send_verification_email(user.email, token)
return {"message": "Check your email to verify your account"}
A05: Security Misconfiguration¶
Common Misconfigurations¶
# VULNERABLE: Debug mode in production
app = FastAPI(debug=True) # Exposes stack traces!
# VULNERABLE: Default credentials
DATABASE_URL = "postgresql://postgres:postgres@localhost/db"
# VULNERABLE: Unnecessary features enabled
CORS_ALLOW_ALL = True # Allows any origin
Prevention¶
# SECURE: Environment-based configuration
import os
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
debug: bool = False
database_url: str
allowed_origins: list[str] = ["https://myapp.com"]
model_config = SettingsConfigDict(env_file=".env")
settings = Settings()
app = FastAPI(debug=settings.debug)
# SECURE: Strict CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["Authorization", "Content-Type"],
)
# SECURE: Security headers
@app.middleware("http")
async def security_headers(request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Content-Security-Policy"] = "default-src 'self'"
response.headers["Strict-Transport-Security"] = "max-age=31536000"
return response
A06: Vulnerable and Outdated Components¶
Detection¶
# Python: Check for vulnerabilities
pip install safety
safety check
# Or with pip-audit
pip install pip-audit
pip-audit
# JavaScript: Check for vulnerabilities
npm audit
bun pm audit
Prevention¶
# .github/workflows/security.yml
name: Security Scan
on:
push:
schedule:
- cron: "0 0 * * *" # Daily
jobs:
dependency-scan:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Python security scan
run: |
pip install safety
safety check -r requirements.txt
- name: JS security scan
run: bun pm audit
# pyproject.toml - Pin versions, update regularly
[tool.poetry.dependencies]
fastapi = "^0.109.0"
sqlalchemy = "^2.0.25"
pydantic = "^2.6.0"
A07: Identification and Authentication Failures¶
Common Failures¶
# VULNERABLE: Weak password requirements
if len(password) >= 4: # Too short!
pass
# VULNERABLE: No rate limiting on login
@app.post("/login")
async def login(credentials: LoginRequest):
pass # Can be brute-forced!
# VULNERABLE: Session fixation
@app.post("/login")
async def login(request: Request, credentials: LoginRequest):
# Reusing existing session ID after login
request.session["user_id"] = user.id
Prevention¶
# SECURE: Strong password validation
import re
def validate_password(password: str) -> list[str]:
errors = []
if len(password) < 12:
errors.append("Password must be at least 12 characters")
if not re.search(r"[A-Z]", password):
errors.append("Must contain uppercase letter")
if not re.search(r"[a-z]", password):
errors.append("Must contain lowercase letter")
if not re.search(r"\d", password):
errors.append("Must contain digit")
if not re.search(r"[!@#$%^&*]", password):
errors.append("Must contain special character")
return errors
# SECURE: Rate limiting
@app.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, credentials: LoginRequest):
pass
# SECURE: Session regeneration
@app.post("/login")
async def login(request: Request, credentials: LoginRequest):
user = await authenticate(credentials)
# Regenerate session ID after login
request.session.clear()
request.session["user_id"] = user.id
request.session.regenerate()
A08: Software and Data Integrity Failures¶
The Vulnerability¶
Untrusted sources for code or data updates.
# VULNERABLE: Unsigned updates
async def update_app():
code = await fetch("http://updates.example.com/latest.zip")
extract_and_run(code) # No verification!
# VULNERABLE: Deserialization of untrusted data
import pickle
data = pickle.loads(user_input) # Remote code execution!
Prevention¶
# SECURE: Verify signatures
import hashlib
import hmac
async def update_app(update_url: str, signature: str):
code = await fetch(update_url)
# Verify signature
expected_sig = hmac.new(
SECRET_KEY.encode(),
code,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_sig):
raise ValueError("Invalid signature")
extract_and_run(code)
# SECURE: Use safe serialization
import json
data = json.loads(user_input) # Safe: no code execution
# CI/CD: Verify dependencies
- name: Verify lockfile integrity
run: |
bun install --frozen-lockfile
pip install --require-hashes -r requirements.txt
A09: Security Logging and Monitoring Failures¶
The Vulnerability¶
Attacks go undetected due to insufficient logging.
Prevention¶
import structlog
from datetime import datetime
log = structlog.get_logger()
# Log security events
async def login(credentials: LoginRequest, request: Request):
user = await get_user_by_email(credentials.email)
if not user or not verify_password(credentials.password, user.password):
log.warning(
"login_failed",
email=credentials.email,
ip=request.client.host,
user_agent=request.headers.get("user-agent"),
timestamp=datetime.utcnow().isoformat(),
)
raise HTTPException(401, "Invalid credentials")
log.info(
"login_success",
user_id=user.id,
ip=request.client.host,
timestamp=datetime.utcnow().isoformat(),
)
# Log authorization failures
async def check_permission(user: User, resource: Resource, action: str):
allowed = has_permission(user, resource, action)
if not allowed:
log.warning(
"authorization_denied",
user_id=user.id,
resource_type=type(resource).__name__,
resource_id=resource.id,
action=action,
)
return allowed
Alert on Suspicious Activity¶
# Alert on multiple failed logins
FAILED_LOGIN_THRESHOLD = 5
ALERT_WINDOW_SECONDS = 300
async def check_login_anomaly(email: str, ip: str):
key = f"failed_login:{email}:{ip}"
count = await redis.incr(key)
if count == 1:
await redis.expire(key, ALERT_WINDOW_SECONDS)
if count >= FAILED_LOGIN_THRESHOLD:
await send_security_alert(
f"Multiple failed login attempts for {email} from {ip}"
)
A10: Server-Side Request Forgery (SSRF)¶
The Vulnerability¶
Attacker tricks server into making requests to internal resources.
# VULNERABLE: Unvalidated URL
@app.post("/fetch-url")
async def fetch_url(url: str):
response = await httpx.get(url) # Can access internal services!
return response.text
# Attacker sends: url=http://169.254.169.254/latest/meta-data/
# Server fetches AWS metadata!
Prevention¶
import ipaddress
from urllib.parse import urlparse
import socket
BLOCKED_HOSTS = {
"localhost",
"127.0.0.1",
"0.0.0.0",
"169.254.169.254", # AWS metadata
"metadata.google.internal", # GCP metadata
}
BLOCKED_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16"),
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("169.254.0.0/16"),
]
def is_safe_url(url: str) -> bool:
try:
parsed = urlparse(url)
# Only allow HTTP/HTTPS
if parsed.scheme not in ("http", "https"):
return False
hostname = parsed.hostname
if not hostname:
return False
# Block known dangerous hosts
if hostname.lower() in BLOCKED_HOSTS:
return False
# Resolve and check IP
ip = socket.gethostbyname(hostname)
ip_obj = ipaddress.ip_address(ip)
for network in BLOCKED_NETWORKS:
if ip_obj in network:
return False
return True
except Exception:
return False
@app.post("/fetch-url")
async def fetch_url(url: str):
if not is_safe_url(url):
raise HTTPException(400, "Invalid or blocked URL")
response = await httpx.get(url, follow_redirects=False)
return response.text
Security Checklist¶
## Pre-Deploy Security Checklist
### Access Control (A01)
- [ ] Authorization checks on all endpoints
- [ ] Resource ownership verified
- [ ] Deny by default
### Cryptography (A02)
- [ ] Passwords hashed with Argon2/bcrypt
- [ ] Sensitive data encrypted at rest
- [ ] TLS for all connections
- [ ] No hardcoded secrets
### Injection (A03)
- [ ] Parameterized queries used
- [ ] Input validated with Pydantic/Zod
- [ ] No shell=True with user input
### Design (A04)
- [ ] Threat modeling completed
- [ ] Rate limiting implemented
- [ ] Secure defaults
### Configuration (A05)
- [ ] Debug mode disabled
- [ ] Security headers set
- [ ] CORS properly configured
- [ ] Error messages don't leak info
### Dependencies (A06)
- [ ] Dependencies scanned
- [ ] No known vulnerabilities
- [ ] Automated scanning in CI
### Authentication (A07)
- [ ] Strong password requirements
- [ ] Rate limiting on auth endpoints
- [ ] Session regeneration on login
- [ ] MFA available
### Integrity (A08)
- [ ] Signed updates
- [ ] Lockfile integrity
- [ ] No pickle/unsafe deserialization
### Logging (A09)
- [ ] Security events logged
- [ ] Alerts on anomalies
- [ ] Logs don't contain secrets
### SSRF (A10)
- [ ] URL validation
- [ ] Block internal IPs
- [ ] No unvalidated redirects
See Also¶
- Input Validation -- Pydantic and Zod validation patterns for injection prevention
- Authentication -- JWT, MFA, and brute-force protection (addresses A07)
- Frontend Security -- Client-side XSS prevention and environment variable safety