Optimization Patterns: Caching, Batching, and N+1 Prevention¶
Common techniques for improving application performance.
Caching¶
Application-Level Cache¶
from functools import lru_cache
import asyncio
# Simple in-memory cache
@lru_cache(maxsize=1000)
def get_user_permissions(user_id: int) -> list[str]:
return fetch_from_db(user_id)
# Async cache with TTL
from cachetools import TTLCache
cache = TTLCache(maxsize=1000, ttl=300) # 5 minutes
async def get_cached_data(key: str):
if key in cache:
return cache[key]
data = await fetch_data(key)
cache[key] = data
return data
Redis Cache¶
import redis
import json
redis_client = redis.Redis(host='localhost', port=6379, db=0)
async def get_with_cache(key: str, fetch_fn, ttl: int = 300):
# Try cache first
cached = redis_client.get(key)
if cached:
return json.loads(cached)
# Fetch and cache
data = await fetch_fn()
redis_client.setex(key, ttl, json.dumps(data))
return data
# Cache invalidation
def invalidate_user_cache(user_id: int):
redis_client.delete(f"user:{user_id}")
redis_client.delete(f"user:{user_id}:profile")
Cache-Aside Pattern¶
async def get_user(user_id: int) -> User:
cache_key = f"user:{user_id}"
# 1. Check cache
cached = await cache.get(cache_key)
if cached:
return User(**cached)
# 2. Load from database
user = await db.get(User, user_id)
if not user:
return None
# 3. Populate cache
await cache.set(cache_key, user.dict(), ttl=300)
return user
async def update_user(user_id: int, data: dict):
# 1. Update database
await db.execute(
update(User).where(User.id == user_id).values(**data)
)
# 2. Invalidate cache
await cache.delete(f"user:{user_id}")
Database Optimization¶
Preventing N+1 Queries¶
# Bad: N+1 queries
users = await db.execute(select(User))
for user in users:
posts = await db.execute(select(Post).where(Post.user_id == user.id))
# Good: Eager loading
users = await db.execute(
select(User).options(selectinload(User.posts))
)
Batching¶
# Bad: Individual inserts
for item in items:
db.add(Item(**item))
await db.commit()
# Good: Batch insert
db.add_all([Item(**item) for item in items])
await db.commit()
# Even better: Bulk insert
await db.execute(
insert(Item),
[{"name": i["name"], "value": i["value"]} for i in items]
)
Query Optimization¶
# Bad: Select all columns
users = await db.execute(select(User))
# Good: Select only needed columns
users = await db.execute(
select(User.id, User.name, User.email)
.where(User.is_active == True)
)
# Use indexes
# CREATE INDEX idx_users_email ON users(email);
# Avoid functions on indexed columns
# Bad: WHERE LOWER(email) = 'test@example.com'
# Good: WHERE email = 'test@example.com' (use case-insensitive collation)
Connection Pooling¶
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # Persistent connections
max_overflow=10, # Extra connections when needed
pool_pre_ping=True, # Verify connection health
pool_recycle=1800, # Recycle after 30 minutes
)
HTTP Optimization¶
Response Compression¶
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
app = FastAPI()
app.add_middleware(GZipMiddleware, minimum_size=1000)
HTTP Caching Headers¶
from fastapi import Response
@app.get("/static-data")
async def get_static_data(response: Response):
response.headers["Cache-Control"] = "public, max-age=3600"
return data
@app.get("/user/{user_id}")
async def get_user(user_id: int, response: Response):
user = await fetch_user(user_id)
response.headers["ETag"] = f'"{hash(user.updated_at)}"'
response.headers["Cache-Control"] = "private, max-age=60"
return user
Pagination¶
from fastapi import Query
@app.get("/users")
async def list_users(
page: int = Query(1, ge=1),
per_page: int = Query(20, ge=1, le=100),
):
offset = (page - 1) * per_page
total = await db.scalar(select(func.count(User.id)))
users = await db.execute(
select(User)
.order_by(User.created_at.desc())
.offset(offset)
.limit(per_page)
)
return {
"data": users.scalars().all(),
"pagination": {
"page": page,
"per_page": per_page,
"total": total,
"pages": (total + per_page - 1) // per_page,
}
}
Frontend Optimization¶
Code Splitting¶
import { lazy, Suspense } from 'react';
// Route-based splitting
const Dashboard = lazy(() => import('./pages/Dashboard'));
const Settings = lazy(() => import('./pages/Settings'));
function App() {
return (
<Suspense fallback={<Loading />}>
<Routes>
<Route path="/dashboard" element={<Dashboard />} />
<Route path="/settings" element={<Settings />} />
</Routes>
</Suspense>
);
}
Memoization¶
import { memo, useMemo, useCallback } from 'react';
// Memoize component
const UserCard = memo(function UserCard({ user }) {
return <div>{user.name}</div>;
});
// Memoize values
function UserList({ users, filter }) {
const filteredUsers = useMemo(
() => users.filter(u => u.name.includes(filter)),
[users, filter]
);
const handleClick = useCallback((id) => {
// ...
}, []);
return filteredUsers.map(user => (
<UserCard key={user.id} user={user} onClick={handleClick} />
));
}
Virtualization¶
import { useVirtualizer } from '@tanstack/react-virtual';
function VirtualList({ items }) {
const parentRef = useRef(null);
const virtualizer = useVirtualizer({
count: items.length,
getScrollElement: () => parentRef.current,
estimateSize: () => 50,
overscan: 5,
});
return (
<div ref={parentRef} style={{ height: 400, overflow: 'auto' }}>
<div style={{ height: virtualizer.getTotalSize(), position: 'relative' }}>
{virtualizer.getVirtualItems().map((virtualItem) => (
<div
key={virtualItem.key}
style={{
position: 'absolute',
top: 0,
left: 0,
width: '100%',
transform: `translateY(${virtualItem.start}px)`,
}}
>
{items[virtualItem.index].name}
</div>
))}
</div>
</div>
);
}
Image Optimization¶
// Next.js Image
import Image from 'next/image';
<Image
src="/hero.jpg"
width={1200}
height={600}
placeholder="blur"
blurDataURL={blurDataUrl}
priority={isAboveFold}
loading={isAboveFold ? 'eager' : 'lazy'}
/>
Async Optimization¶
Parallel Execution¶
import asyncio
# Bad: Sequential
user = await get_user(user_id)
posts = await get_posts(user_id)
comments = await get_comments(user_id)
# Good: Parallel
user, posts, comments = await asyncio.gather(
get_user(user_id),
get_posts(user_id),
get_comments(user_id),
)
Background Tasks¶
from fastapi import BackgroundTasks
@app.post("/orders")
async def create_order(order: OrderCreate, background_tasks: BackgroundTasks):
# Create order (fast)
order = await create_order_in_db(order)
# Queue slow tasks
background_tasks.add_task(send_confirmation_email, order.id)
background_tasks.add_task(update_inventory, order.items)
return order # Return immediately
Patterns Summary¶
| Pattern | Use Case | Benefit |
|---|---|---|
| Caching | Repeated reads | Reduce DB load |
| Batching | Multiple writes | Reduce round trips |
| Pagination | Large datasets | Limit data transfer |
| Eager loading | Related data | Prevent N+1 |
| Code splitting | Large bundles | Faster initial load |
| Virtualization | Long lists | Reduce DOM nodes |
| Parallel execution | Independent ops | Reduce latency |
| Background tasks | Slow operations | Faster responses |