Input Validation¶
Never trust user input. Validate everything server-side.
The Validation Hierarchy¶
┌─────────────────────────────────────┐
│ Client-Side Validation │ ← UX only, easily bypassed
├─────────────────────────────────────┤
│ API Layer Validation │ ← Type checking, format validation
├─────────────────────────────────────┤
│ Business Logic Validation │ ← Domain rules, permissions
├─────────────────────────────────────┤
│ Database Constraints │ ← Final safety net
└─────────────────────────────────────┘
Pydantic Validation (Python)¶
Basic Validation¶
from pydantic import BaseModel, ConfigDict, Field, EmailStr
from typing import Optional
from datetime import date
class UserCreate(BaseModel):
model_config = ConfigDict(str_strip_whitespace=True)
email: EmailStr
username: str = Field(..., min_length=3, max_length=30, pattern=r"^[a-zA-Z0-9_]+$")
age: int = Field(..., ge=13, le=120)
bio: Optional[str] = Field(None, max_length=500)
Custom Validators¶
from pydantic import BaseModel, field_validator, model_validator
import re
class PasswordChange(BaseModel):
current_password: str
new_password: str
confirm_password: str
@field_validator("new_password")
@classmethod
def password_strength(cls, v: str) -> str:
if len(v) < 12:
raise ValueError("Password must be at least 12 characters")
if not re.search(r"[A-Z]", v):
raise ValueError("Password must contain uppercase letter")
if not re.search(r"[a-z]", v):
raise ValueError("Password must contain lowercase letter")
if not re.search(r"\d", v):
raise ValueError("Password must contain digit")
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", v):
raise ValueError("Password must contain special character")
return v
@model_validator(mode="after")
def passwords_match(self) -> "PasswordChange":
if self.new_password != self.confirm_password:
raise ValueError("Passwords do not match")
if self.new_password == self.current_password:
raise ValueError("New password must be different from current")
return self
Sanitization¶
from pydantic import BaseModel, Field, field_validator
import html
import re
class Comment(BaseModel):
content: str = Field(..., max_length=10000)
@field_validator("content")
@classmethod
def sanitize_content(cls, v: str) -> str:
# Remove null bytes
v = v.replace("\x00", "")
# Escape HTML to prevent XSS
v = html.escape(v)
# Remove excessive whitespace
v = re.sub(r"\s+", " ", v).strip()
return v
File Upload Validation¶
from pydantic import BaseModel
from fastapi import UploadFile
import magic # python-magic
ALLOWED_MIME_TYPES = {
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
}
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
async def validate_image_upload(file: UploadFile) -> bytes:
# Check file size
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise ValueError(f"File too large. Maximum size is {MAX_FILE_SIZE // 1024 // 1024}MB")
# Check actual MIME type (not just extension)
mime_type = magic.from_buffer(content, mime=True)
if mime_type not in ALLOWED_MIME_TYPES:
raise ValueError(f"Invalid file type: {mime_type}")
# Reset file position
await file.seek(0)
return content
Zod Validation (TypeScript)¶
Basic Schemas¶
import { z } from "zod";
const UserCreateSchema = z.object({
email: z.string().email(),
username: z
.string()
.min(3)
.max(30)
.regex(/^[a-zA-Z0-9_]+$/, "Username must be alphanumeric"),
age: z.number().int().min(13).max(120),
bio: z.string().max(500).optional(),
});
type UserCreate = z.infer<typeof UserCreateSchema>;
// Usage
function createUser(data: unknown) {
const validated = UserCreateSchema.parse(data);
// validated is now typed as UserCreate
}
Custom Refinements¶
const PasswordSchema = z
.string()
.min(12, "Password must be at least 12 characters")
.refine((val) => /[A-Z]/.test(val), "Must contain uppercase letter")
.refine((val) => /[a-z]/.test(val), "Must contain lowercase letter")
.refine((val) => /\d/.test(val), "Must contain digit")
.refine(
(val) => /[!@#$%^&*(),.?":{}|<>]/.test(val),
"Must contain special character"
);
const PasswordChangeSchema = z
.object({
currentPassword: z.string(),
newPassword: PasswordSchema,
confirmPassword: z.string(),
})
.refine((data) => data.newPassword === data.confirmPassword, {
message: "Passwords don't match",
path: ["confirmPassword"],
})
.refine((data) => data.newPassword !== data.currentPassword, {
message: "New password must be different",
path: ["newPassword"],
});
Server Actions Validation¶
"use server";
import { z } from "zod";
const CreatePostSchema = z.object({
title: z.string().min(1).max(200),
content: z.string().min(1).max(50000),
tags: z.array(z.string()).max(10),
});
export async function createPost(formData: FormData) {
const rawData = {
title: formData.get("title"),
content: formData.get("content"),
tags: formData.getAll("tags"),
};
// Validate
const result = CreatePostSchema.safeParse(rawData);
if (!result.success) {
return { error: result.error.flatten() };
}
// Use validated data
const { title, content, tags } = result.data;
// ... create post
}
Common Validation Patterns¶
UUID Validation¶
from pydantic import BaseModel, field_validator
from uuid import UUID
class ResourceRequest(BaseModel):
resource_id: str
@field_validator("resource_id")
@classmethod
def validate_uuid(cls, v: str) -> str:
try:
UUID(v)
except ValueError:
raise ValueError("Invalid resource ID format")
return v
URL Validation¶
from pydantic import BaseModel, HttpUrl, field_validator
from urllib.parse import urlparse
class WebhookConfig(BaseModel):
url: HttpUrl
@field_validator("url")
@classmethod
def validate_url(cls, v: HttpUrl) -> HttpUrl:
parsed = urlparse(str(v))
# Only allow HTTPS
if parsed.scheme != "https":
raise ValueError("URL must use HTTPS")
# Block internal IPs (SSRF prevention)
forbidden_hosts = {"localhost", "127.0.0.1", "0.0.0.0", "169.254.169.254"}
if parsed.hostname in forbidden_hosts:
raise ValueError("URL cannot point to internal addresses")
return v
Date Range Validation¶
from pydantic import BaseModel, model_validator
from datetime import date, timedelta
class DateRange(BaseModel):
start_date: date
end_date: date
@model_validator(mode="after")
def validate_dates(self) -> "DateRange":
if self.end_date < self.start_date:
raise ValueError("End date must be after start date")
if (self.end_date - self.start_date).days > 365:
raise ValueError("Date range cannot exceed 1 year")
return self
Enum Validation¶
from enum import Enum
from pydantic import BaseModel
class OrderStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
class OrderUpdate(BaseModel):
status: OrderStatus # Only valid enum values accepted
const OrderStatusSchema = z.enum([
"pending",
"processing",
"shipped",
"delivered",
"cancelled",
]);
Preventing Injection Attacks¶
SQL Injection Prevention¶
# NEVER do this
query = f"SELECT * FROM users WHERE id = '{user_input}'" # VULNERABLE
# Do this instead
from sqlalchemy import select
result = await session.execute(
select(User).where(User.id == user_input) # Parameterized
)
# Or with raw SQL (parameterized)
result = await session.execute(
text("SELECT * FROM users WHERE id = :id"),
{"id": user_input}
)
Command Injection Prevention¶
import subprocess
import shlex
# NEVER do this
subprocess.run(f"echo {user_input}", shell=True) # VULNERABLE
# Do this instead
subprocess.run(["echo", user_input]) # Safe: no shell interpretation
# If shell is required, escape properly
subprocess.run(f"echo {shlex.quote(user_input)}", shell=True)
Path Traversal Prevention¶
from pathlib import Path
UPLOAD_DIR = Path("/var/uploads")
def get_safe_path(filename: str) -> Path:
# Remove any path components
safe_name = Path(filename).name
# Construct full path
full_path = UPLOAD_DIR / safe_name
# Verify it's still within upload directory
if not full_path.resolve().is_relative_to(UPLOAD_DIR.resolve()):
raise ValueError("Invalid filename")
return full_path
LDAP Injection Prevention¶
import re
def sanitize_ldap_input(value: str) -> str:
"""Escape special LDAP characters."""
escape_chars = {
"\\": r"\5c",
"*": r"\2a",
"(": r"\28",
")": r"\29",
"\x00": r"\00",
}
for char, escape in escape_chars.items():
value = value.replace(char, escape)
return value
Rate Limiting¶
from fastapi import FastAPI, Request, HTTPException
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/login")
@limiter.limit("5/minute") # Strict for auth endpoints
async def login(request: Request):
pass
@app.get("/api/items")
@limiter.limit("100/minute") # More lenient for regular endpoints
async def list_items(request: Request):
pass
Validation Error Handling¶
Error Handling Integration
For how validation errors integrate with the application's error hierarchy, see Error Handling Patterns.
FastAPI Error Response¶
from fastapi import FastAPI, HTTPException
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
app = FastAPI()
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request, exc):
errors = []
for error in exc.errors():
errors.append({
"field": ".".join(str(x) for x in error["loc"]),
"message": error["msg"],
})
return JSONResponse(
status_code=422,
content={
"error": "validation_error",
"message": "Invalid request data",
"details": errors,
}
)
Best Practices Summary¶
| Practice | Implementation |
|---|---|
| Validate on server | Never trust client validation alone |
| Use type coercion | Pydantic/Zod handle type conversion |
| Whitelist, don't blacklist | Define what's allowed, not what's forbidden |
| Validate early | Reject bad input before processing |
| Sanitize output | Escape for context (HTML, SQL, etc.) |
| Limit sizes | Max lengths, file sizes, array lengths |
| Check MIME types | Don't trust file extensions |
| Block internal URLs | Prevent SSRF with URL validation |
See Also¶
- Error Handling -- Exception hierarchy for surfacing validation failures
- Error Contracts -- How validation errors are formatted in API responses
- Frontend Security -- Client-side sanitization and XSS prevention