Skip to content

Input Validation

Never trust user input. Validate everything server-side.

The Validation Hierarchy

┌─────────────────────────────────────┐
│     Client-Side Validation          │  ← UX only, easily bypassed
├─────────────────────────────────────┤
│     API Layer Validation            │  ← Type checking, format validation
├─────────────────────────────────────┤
│     Business Logic Validation       │  ← Domain rules, permissions
├─────────────────────────────────────┤
│     Database Constraints            │  ← Final safety net
└─────────────────────────────────────┘

Pydantic Validation (Python)

Basic Validation

from pydantic import BaseModel, ConfigDict, Field, EmailStr
from typing import Optional
from datetime import date

class UserCreate(BaseModel):
    model_config = ConfigDict(str_strip_whitespace=True)

    email: EmailStr
    username: str = Field(..., min_length=3, max_length=30, pattern=r"^[a-zA-Z0-9_]+$")
    age: int = Field(..., ge=13, le=120)
    bio: Optional[str] = Field(None, max_length=500)

Custom Validators

from pydantic import BaseModel, field_validator, model_validator
import re

class PasswordChange(BaseModel):
    current_password: str
    new_password: str
    confirm_password: str

    @field_validator("new_password")
    @classmethod
    def password_strength(cls, v: str) -> str:
        if len(v) < 12:
            raise ValueError("Password must be at least 12 characters")
        if not re.search(r"[A-Z]", v):
            raise ValueError("Password must contain uppercase letter")
        if not re.search(r"[a-z]", v):
            raise ValueError("Password must contain lowercase letter")
        if not re.search(r"\d", v):
            raise ValueError("Password must contain digit")
        if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", v):
            raise ValueError("Password must contain special character")
        return v

    @model_validator(mode="after")
    def passwords_match(self) -> "PasswordChange":
        if self.new_password != self.confirm_password:
            raise ValueError("Passwords do not match")
        if self.new_password == self.current_password:
            raise ValueError("New password must be different from current")
        return self

Sanitization

from pydantic import BaseModel, Field, field_validator
import html
import re

class Comment(BaseModel):
    content: str = Field(..., max_length=10000)

    @field_validator("content")
    @classmethod
    def sanitize_content(cls, v: str) -> str:
        # Remove null bytes
        v = v.replace("\x00", "")

        # Escape HTML to prevent XSS
        v = html.escape(v)

        # Remove excessive whitespace
        v = re.sub(r"\s+", " ", v).strip()

        return v

File Upload Validation

from pydantic import BaseModel
from fastapi import UploadFile
import magic  # python-magic

ALLOWED_MIME_TYPES = {
    "image/jpeg",
    "image/png",
    "image/gif",
    "image/webp",
}
MAX_FILE_SIZE = 5 * 1024 * 1024  # 5MB

async def validate_image_upload(file: UploadFile) -> bytes:
    # Check file size
    content = await file.read()
    if len(content) > MAX_FILE_SIZE:
        raise ValueError(f"File too large. Maximum size is {MAX_FILE_SIZE // 1024 // 1024}MB")

    # Check actual MIME type (not just extension)
    mime_type = magic.from_buffer(content, mime=True)
    if mime_type not in ALLOWED_MIME_TYPES:
        raise ValueError(f"Invalid file type: {mime_type}")

    # Reset file position
    await file.seek(0)

    return content

Zod Validation (TypeScript)

Basic Schemas

import { z } from "zod";

const UserCreateSchema = z.object({
  email: z.string().email(),
  username: z
    .string()
    .min(3)
    .max(30)
    .regex(/^[a-zA-Z0-9_]+$/, "Username must be alphanumeric"),
  age: z.number().int().min(13).max(120),
  bio: z.string().max(500).optional(),
});

type UserCreate = z.infer<typeof UserCreateSchema>;

// Usage
function createUser(data: unknown) {
  const validated = UserCreateSchema.parse(data);
  // validated is now typed as UserCreate
}

Custom Refinements

const PasswordSchema = z
  .string()
  .min(12, "Password must be at least 12 characters")
  .refine((val) => /[A-Z]/.test(val), "Must contain uppercase letter")
  .refine((val) => /[a-z]/.test(val), "Must contain lowercase letter")
  .refine((val) => /\d/.test(val), "Must contain digit")
  .refine(
    (val) => /[!@#$%^&*(),.?":{}|<>]/.test(val),
    "Must contain special character"
  );

const PasswordChangeSchema = z
  .object({
    currentPassword: z.string(),
    newPassword: PasswordSchema,
    confirmPassword: z.string(),
  })
  .refine((data) => data.newPassword === data.confirmPassword, {
    message: "Passwords don't match",
    path: ["confirmPassword"],
  })
  .refine((data) => data.newPassword !== data.currentPassword, {
    message: "New password must be different",
    path: ["newPassword"],
  });

Server Actions Validation

"use server";

import { z } from "zod";

const CreatePostSchema = z.object({
  title: z.string().min(1).max(200),
  content: z.string().min(1).max(50000),
  tags: z.array(z.string()).max(10),
});

export async function createPost(formData: FormData) {
  const rawData = {
    title: formData.get("title"),
    content: formData.get("content"),
    tags: formData.getAll("tags"),
  };

  // Validate
  const result = CreatePostSchema.safeParse(rawData);
  if (!result.success) {
    return { error: result.error.flatten() };
  }

  // Use validated data
  const { title, content, tags } = result.data;
  // ... create post
}

Common Validation Patterns

UUID Validation

from pydantic import BaseModel, field_validator
from uuid import UUID

class ResourceRequest(BaseModel):
    resource_id: str

    @field_validator("resource_id")
    @classmethod
    def validate_uuid(cls, v: str) -> str:
        try:
            UUID(v)
        except ValueError:
            raise ValueError("Invalid resource ID format")
        return v
const UUIDSchema = z.string().uuid();

URL Validation

from pydantic import BaseModel, HttpUrl, field_validator
from urllib.parse import urlparse

class WebhookConfig(BaseModel):
    url: HttpUrl

    @field_validator("url")
    @classmethod
    def validate_url(cls, v: HttpUrl) -> HttpUrl:
        parsed = urlparse(str(v))

        # Only allow HTTPS
        if parsed.scheme != "https":
            raise ValueError("URL must use HTTPS")

        # Block internal IPs (SSRF prevention)
        forbidden_hosts = {"localhost", "127.0.0.1", "0.0.0.0", "169.254.169.254"}
        if parsed.hostname in forbidden_hosts:
            raise ValueError("URL cannot point to internal addresses")

        return v

Date Range Validation

from pydantic import BaseModel, model_validator
from datetime import date, timedelta

class DateRange(BaseModel):
    start_date: date
    end_date: date

    @model_validator(mode="after")
    def validate_dates(self) -> "DateRange":
        if self.end_date < self.start_date:
            raise ValueError("End date must be after start date")
        if (self.end_date - self.start_date).days > 365:
            raise ValueError("Date range cannot exceed 1 year")

        return self

Enum Validation

from enum import Enum
from pydantic import BaseModel

class OrderStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

class OrderUpdate(BaseModel):
    status: OrderStatus  # Only valid enum values accepted
const OrderStatusSchema = z.enum([
  "pending",
  "processing",
  "shipped",
  "delivered",
  "cancelled",
]);

Preventing Injection Attacks

SQL Injection Prevention

# NEVER do this
query = f"SELECT * FROM users WHERE id = '{user_input}'"  # VULNERABLE

# Do this instead
from sqlalchemy import select
result = await session.execute(
    select(User).where(User.id == user_input)  # Parameterized
)

# Or with raw SQL (parameterized)
result = await session.execute(
    text("SELECT * FROM users WHERE id = :id"),
    {"id": user_input}
)

Command Injection Prevention

import subprocess
import shlex

# NEVER do this
subprocess.run(f"echo {user_input}", shell=True)  # VULNERABLE

# Do this instead
subprocess.run(["echo", user_input])  # Safe: no shell interpretation

# If shell is required, escape properly
subprocess.run(f"echo {shlex.quote(user_input)}", shell=True)

Path Traversal Prevention

from pathlib import Path

UPLOAD_DIR = Path("/var/uploads")

def get_safe_path(filename: str) -> Path:
    # Remove any path components
    safe_name = Path(filename).name

    # Construct full path
    full_path = UPLOAD_DIR / safe_name

    # Verify it's still within upload directory
    if not full_path.resolve().is_relative_to(UPLOAD_DIR.resolve()):
        raise ValueError("Invalid filename")

    return full_path

LDAP Injection Prevention

import re

def sanitize_ldap_input(value: str) -> str:
    """Escape special LDAP characters."""
    escape_chars = {
        "\\": r"\5c",
        "*": r"\2a",
        "(": r"\28",
        ")": r"\29",
        "\x00": r"\00",
    }
    for char, escape in escape_chars.items():
        value = value.replace(char, escape)
    return value

Rate Limiting

from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.post("/login")
@limiter.limit("5/minute")  # Strict for auth endpoints
async def login(request: Request):
    pass

@app.get("/api/items")
@limiter.limit("100/minute")  # More lenient for regular endpoints
async def list_items(request: Request):
    pass

Validation Error Handling

Error Handling Integration

For how validation errors integrate with the application's error hierarchy, see Error Handling Patterns.

FastAPI Error Response

from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse

app = FastAPI()

@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
    errors = []
    for error in exc.errors():
        errors.append({
            "field": ".".join(str(x) for x in error["loc"]),
            "message": error["msg"],
        })

    return JSONResponse(
        status_code=422,
        content={
            "error": "validation_error",
            "message": "Invalid request data",
            "details": errors,
        }
    )

Best Practices Summary

Practice Implementation
Validate on server Never trust client validation alone
Use type coercion Pydantic/Zod handle type conversion
Whitelist, don't blacklist Define what's allowed, not what's forbidden
Validate early Reject bad input before processing
Sanitize output Escape for context (HTML, SQL, etc.)
Limit sizes Max lengths, file sizes, array lengths
Check MIME types Don't trust file extensions
Block internal URLs Prevent SSRF with URL validation

See Also