Skip to content

OWASP Top 10

The most critical web application security risks and how to prevent them.

Overview

# Risk Impact Prevention
A01 Broken Access Control Unauthorized access Authorization checks, deny by default
A02 Cryptographic Failures Data exposure Strong encryption, proper key management
A03 Injection Data theft, system compromise Parameterized queries, input validation
A04 Insecure Design Fundamental flaws Threat modeling, secure design patterns
A05 Security Misconfiguration System compromise Hardening, automation, auditing
A06 Vulnerable Components Various Dependency scanning, updates
A07 Auth Failures Account takeover Strong auth, MFA, rate limiting
A08 Data Integrity Failures Compromise via updates Signed updates, integrity checks
A09 Logging Failures Undetected breaches Comprehensive logging, monitoring
A10 SSRF Internal network access URL validation, network segmentation

A01: Broken Access Control

The Vulnerability

Users can access unauthorized data or perform unauthorized actions.

# VULNERABLE: No authorization check
@app.get("/api/users/{user_id}/documents")
async def get_user_documents(user_id: int):
    return await db.query(Document).filter(Document.user_id == user_id).all()
    # Any authenticated user can access any user's documents!

Prevention

# SECURE: Verify ownership
@app.get("/api/users/{user_id}/documents")
async def get_user_documents(
    user_id: int,
    current_user: User = Depends(get_current_user),
):
    # Verify the user can only access their own documents
    if user_id != current_user.id and current_user.role != Role.ADMIN:
        raise HTTPException(403, "Access denied")

    return await db.query(Document).filter(Document.user_id == user_id).all()
// SECURE: Server-side authorization in Next.js
export async function GET(request: Request, { params }: { params: { userId: string } }) {
  const session = await getServerSession();

  if (!session) {
    return Response.json({ error: "Unauthorized" }, { status: 401 });
  }

  // Users can only access their own data
  if (params.userId !== session.user.id && session.user.role !== "admin") {
    return Response.json({ error: "Forbidden" }, { status: 403 });
  }

  const documents = await getDocuments(params.userId);
  return Response.json(documents);
}

A02: Cryptographic Failures

The Vulnerability

Sensitive data exposed due to weak cryptography or improper handling.

# VULNERABLE: Storing passwords in plain text
user.password = request.password  # Never do this!

# VULNERABLE: Weak hashing
import hashlib
user.password = hashlib.md5(request.password.encode()).hexdigest()  # Weak!

# VULNERABLE: Hardcoded encryption key
SECRET_KEY = "hardcoded-secret-key-123"  # In source code!

Prevention

# SECURE: Strong password hashing
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)
# SECURE: Proper encryption
from cryptography.fernet import Fernet
import os

# Key from environment, not hardcoded
ENCRYPTION_KEY = os.environ["ENCRYPTION_KEY"]
fernet = Fernet(ENCRYPTION_KEY)

def encrypt_sensitive_data(data: str) -> bytes:
    return fernet.encrypt(data.encode())

def decrypt_sensitive_data(encrypted: bytes) -> str:
    return fernet.decrypt(encrypted).decode()

A03: Injection

SQL Injection

# VULNERABLE: String concatenation
user_input = "'; DROP TABLE users; --"
query = f"SELECT * FROM users WHERE name = '{user_input}'"
# Results in: SELECT * FROM users WHERE name = ''; DROP TABLE users; --'

# SECURE: Parameterized queries
from sqlalchemy import select, text

# Using ORM (automatically parameterized)
result = await session.execute(
    select(User).where(User.name == user_input)
)

# Using raw SQL with parameters
result = await session.execute(
    text("SELECT * FROM users WHERE name = :name"),
    {"name": user_input}
)

Command Injection

import subprocess

# VULNERABLE: Shell injection
filename = "file.txt; rm -rf /"
subprocess.run(f"cat {filename}", shell=True)  # Dangerous!

# SECURE: Avoid shell, use list arguments
subprocess.run(["cat", filename])  # Safe: no shell interpretation

NoSQL Injection

# VULNERABLE: Unvalidated MongoDB query
user_input = {"$gt": ""}  # Matches all documents
result = db.users.find({"password": user_input})

# SECURE: Type validation
from pydantic import BaseModel, field_validator

class LoginRequest(BaseModel):
    username: str
    password: str

    @field_validator("password")
    @classmethod
    def password_must_be_string(cls, v):
        if not isinstance(v, str):
            raise ValueError("Password must be a string")
        return v

A04: Insecure Design

The Vulnerability

Design flaws that cannot be fixed by implementation changes alone.

Prevention: Threat Modeling

## Threat Model: User Registration

### Assets
- User credentials
- Email addresses
- Personal data

### Threats (STRIDE)
1. Spoofing: Fake registration with someone else's email
2. Tampering: Modify registration data in transit
3. Repudiation: Deny creating account
4. Info Disclosure: Expose other users' data
5. DoS: Mass fake registrations
6. Elevation: Register as admin

### Mitigations
1. Email verification required
2. HTTPS enforced
3. Audit logging
4. Rate limiting, CAPTCHA
5. Input validation, role assignment server-side

Secure Design Patterns

# Rate limiting by design
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@app.post("/api/register")
@limiter.limit("3/hour")  # 3 registrations per IP per hour
async def register(request: Request, user: UserCreate):
    pass

# Email verification by design
async def register_user(user_data: UserCreate):
    user = await create_user(user_data, is_verified=False)
    token = create_verification_token(user.id)
    await send_verification_email(user.email, token)
    return {"message": "Check your email to verify your account"}

A05: Security Misconfiguration

Common Misconfigurations

# VULNERABLE: Debug mode in production
app = FastAPI(debug=True)  # Exposes stack traces!

# VULNERABLE: Default credentials
DATABASE_URL = "postgresql://postgres:postgres@localhost/db"

# VULNERABLE: Unnecessary features enabled
CORS_ALLOW_ALL = True  # Allows any origin

Prevention

# SECURE: Environment-based configuration
import os
from pydantic_settings import BaseSettings, SettingsConfigDict

class Settings(BaseSettings):
    debug: bool = False
    database_url: str
    allowed_origins: list[str] = ["https://myapp.com"]

    model_config = SettingsConfigDict(env_file=".env")

settings = Settings()

app = FastAPI(debug=settings.debug)

# SECURE: Strict CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.allowed_origins,
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
)

# SECURE: Security headers
@app.middleware("http")
async def security_headers(request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    response.headers["Strict-Transport-Security"] = "max-age=31536000"
    return response

A06: Vulnerable and Outdated Components

Detection

# Python: Check for vulnerabilities
pip install safety
safety check

# Or with pip-audit
pip install pip-audit
pip-audit

# JavaScript: Check for vulnerabilities
npm audit
bun pm audit

Prevention

# .github/workflows/security.yml
name: Security Scan

on:
  push:
  schedule:
    - cron: "0 0 * * *"  # Daily

jobs:
  dependency-scan:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4

      - name: Python security scan
        run: |
          pip install safety
          safety check -r requirements.txt

      - name: JS security scan
        run: bun pm audit
# pyproject.toml - Pin versions, update regularly
[tool.poetry.dependencies]
fastapi = "^0.109.0"
sqlalchemy = "^2.0.25"
pydantic = "^2.6.0"

A07: Identification and Authentication Failures

Common Failures

# VULNERABLE: Weak password requirements
if len(password) >= 4:  # Too short!
    pass

# VULNERABLE: No rate limiting on login
@app.post("/login")
async def login(credentials: LoginRequest):
    pass  # Can be brute-forced!

# VULNERABLE: Session fixation
@app.post("/login")
async def login(request: Request, credentials: LoginRequest):
    # Reusing existing session ID after login
    request.session["user_id"] = user.id

Prevention

# SECURE: Strong password validation
import re

def validate_password(password: str) -> list[str]:
    errors = []
    if len(password) < 12:
        errors.append("Password must be at least 12 characters")
    if not re.search(r"[A-Z]", password):
        errors.append("Must contain uppercase letter")
    if not re.search(r"[a-z]", password):
        errors.append("Must contain lowercase letter")
    if not re.search(r"\d", password):
        errors.append("Must contain digit")
    if not re.search(r"[!@#$%^&*]", password):
        errors.append("Must contain special character")
    return errors

# SECURE: Rate limiting
@app.post("/login")
@limiter.limit("5/minute")
async def login(request: Request, credentials: LoginRequest):
    pass

# SECURE: Session regeneration
@app.post("/login")
async def login(request: Request, credentials: LoginRequest):
    user = await authenticate(credentials)
    # Regenerate session ID after login
    request.session.clear()
    request.session["user_id"] = user.id
    request.session.regenerate()

A08: Software and Data Integrity Failures

The Vulnerability

Untrusted sources for code or data updates.

# VULNERABLE: Unsigned updates
async def update_app():
    code = await fetch("http://updates.example.com/latest.zip")
    extract_and_run(code)  # No verification!

# VULNERABLE: Deserialization of untrusted data
import pickle
data = pickle.loads(user_input)  # Remote code execution!

Prevention

# SECURE: Verify signatures
import hashlib
import hmac

async def update_app(update_url: str, signature: str):
    code = await fetch(update_url)

    # Verify signature
    expected_sig = hmac.new(
        SECRET_KEY.encode(),
        code,
        hashlib.sha256
    ).hexdigest()

    if not hmac.compare_digest(signature, expected_sig):
        raise ValueError("Invalid signature")

    extract_and_run(code)

# SECURE: Use safe serialization
import json
data = json.loads(user_input)  # Safe: no code execution
# CI/CD: Verify dependencies
- name: Verify lockfile integrity
  run: |
    bun install --frozen-lockfile
    pip install --require-hashes -r requirements.txt

A09: Security Logging and Monitoring Failures

The Vulnerability

Attacks go undetected due to insufficient logging.

Prevention

import structlog
from datetime import datetime

log = structlog.get_logger()

# Log security events
async def login(credentials: LoginRequest, request: Request):
    user = await get_user_by_email(credentials.email)

    if not user or not verify_password(credentials.password, user.password):
        log.warning(
            "login_failed",
            email=credentials.email,
            ip=request.client.host,
            user_agent=request.headers.get("user-agent"),
            timestamp=datetime.utcnow().isoformat(),
        )
        raise HTTPException(401, "Invalid credentials")

    log.info(
        "login_success",
        user_id=user.id,
        ip=request.client.host,
        timestamp=datetime.utcnow().isoformat(),
    )

# Log authorization failures
async def check_permission(user: User, resource: Resource, action: str):
    allowed = has_permission(user, resource, action)

    if not allowed:
        log.warning(
            "authorization_denied",
            user_id=user.id,
            resource_type=type(resource).__name__,
            resource_id=resource.id,
            action=action,
        )

    return allowed

Alert on Suspicious Activity

# Alert on multiple failed logins
FAILED_LOGIN_THRESHOLD = 5
ALERT_WINDOW_SECONDS = 300

async def check_login_anomaly(email: str, ip: str):
    key = f"failed_login:{email}:{ip}"
    count = await redis.incr(key)

    if count == 1:
        await redis.expire(key, ALERT_WINDOW_SECONDS)

    if count >= FAILED_LOGIN_THRESHOLD:
        await send_security_alert(
            f"Multiple failed login attempts for {email} from {ip}"
        )

A10: Server-Side Request Forgery (SSRF)

The Vulnerability

Attacker tricks server into making requests to internal resources.

# VULNERABLE: Unvalidated URL
@app.post("/fetch-url")
async def fetch_url(url: str):
    response = await httpx.get(url)  # Can access internal services!
    return response.text

# Attacker sends: url=http://169.254.169.254/latest/meta-data/
# Server fetches AWS metadata!

Prevention

import ipaddress
from urllib.parse import urlparse
import socket

BLOCKED_HOSTS = {
    "localhost",
    "127.0.0.1",
    "0.0.0.0",
    "169.254.169.254",  # AWS metadata
    "metadata.google.internal",  # GCP metadata
}

BLOCKED_NETWORKS = [
    ipaddress.ip_network("10.0.0.0/8"),
    ipaddress.ip_network("172.16.0.0/12"),
    ipaddress.ip_network("192.168.0.0/16"),
    ipaddress.ip_network("127.0.0.0/8"),
    ipaddress.ip_network("169.254.0.0/16"),
]

def is_safe_url(url: str) -> bool:
    try:
        parsed = urlparse(url)

        # Only allow HTTP/HTTPS
        if parsed.scheme not in ("http", "https"):
            return False

        hostname = parsed.hostname
        if not hostname:
            return False

        # Block known dangerous hosts
        if hostname.lower() in BLOCKED_HOSTS:
            return False

        # Resolve and check IP
        ip = socket.gethostbyname(hostname)
        ip_obj = ipaddress.ip_address(ip)

        for network in BLOCKED_NETWORKS:
            if ip_obj in network:
                return False

        return True
    except Exception:
        return False

@app.post("/fetch-url")
async def fetch_url(url: str):
    if not is_safe_url(url):
        raise HTTPException(400, "Invalid or blocked URL")

    response = await httpx.get(url, follow_redirects=False)
    return response.text

Security Checklist

## Pre-Deploy Security Checklist

### Access Control (A01)
- [ ] Authorization checks on all endpoints
- [ ] Resource ownership verified
- [ ] Deny by default

### Cryptography (A02)
- [ ] Passwords hashed with Argon2/bcrypt
- [ ] Sensitive data encrypted at rest
- [ ] TLS for all connections
- [ ] No hardcoded secrets

### Injection (A03)
- [ ] Parameterized queries used
- [ ] Input validated with Pydantic/Zod
- [ ] No shell=True with user input

### Design (A04)
- [ ] Threat modeling completed
- [ ] Rate limiting implemented
- [ ] Secure defaults

### Configuration (A05)
- [ ] Debug mode disabled
- [ ] Security headers set
- [ ] CORS properly configured
- [ ] Error messages don't leak info

### Dependencies (A06)
- [ ] Dependencies scanned
- [ ] No known vulnerabilities
- [ ] Automated scanning in CI

### Authentication (A07)
- [ ] Strong password requirements
- [ ] Rate limiting on auth endpoints
- [ ] Session regeneration on login
- [ ] MFA available

### Integrity (A08)
- [ ] Signed updates
- [ ] Lockfile integrity
- [ ] No pickle/unsafe deserialization

### Logging (A09)
- [ ] Security events logged
- [ ] Alerts on anomalies
- [ ] Logs don't contain secrets

### SSRF (A10)
- [ ] URL validation
- [ ] Block internal IPs
- [ ] No unvalidated redirects

See Also