Connection Management¶
Optimize database connections for performance and reliability.
Connection Pooling Basics¶
Why Pooling?¶
Creating database connections is expensive: - TCP handshake - SSL negotiation - Authentication - Memory allocation
Connection pools maintain reusable connections.
Without pooling:
Request → Create Connection → Query → Close Connection → Response
↑ Expensive
With pooling:
Request → Get Connection from Pool → Query → Return to Pool → Response
↑ Fast
SQLAlchemy Async Configuration¶
Basic Setup¶
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # Connections to keep open
max_overflow=10, # Extra connections when pool is full
pool_timeout=30, # Seconds to wait for a connection
pool_recycle=1800, # Recycle connections after 30 min
pool_pre_ping=True, # Verify connection before use
echo=False, # Log SQL (True for debugging)
)
async_session = async_sessionmaker(
engine,
expire_on_commit=False,
class_=AsyncSession
)
Pool Size Guidelines¶
| Scenario | pool_size | max_overflow | Total Max |
|---|---|---|---|
| Development | 5 | 5 | 10 |
| Small app | 10 | 10 | 20 |
| Medium app | 20 | 10 | 30 |
| High traffic | 50+ | 20 | 70+ |
Formula: total_connections = workers × pool_size_per_worker
For 4 Uvicorn workers with pool_size=20: 80 connections max.
Connection Recycling¶
Prevent stale connections:
engine = create_async_engine(
DATABASE_URL,
pool_recycle=1800, # 30 minutes
pool_pre_ping=True, # Check connection before use
)
pool_pre_ping: Issues SELECT 1 before returning connection. Small overhead, prevents "connection closed" errors.
Asyncpg-Specific Tuning¶
Statement Caching¶
# Increase prepared statement cache (default: 100)
engine = create_async_engine(
DATABASE_URL,
connect_args={
"statement_cache_size": 500,
}
)
Connection Parameters¶
engine = create_async_engine(
DATABASE_URL,
connect_args={
"statement_cache_size": 500,
"command_timeout": 60, # Query timeout in seconds
"server_settings": {
"application_name": "sartiq-api",
"jit": "off", # Disable JIT for short queries
}
}
)
FastAPI Dependency Pattern¶
Session Per Request¶
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""Yield a database session for request."""
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Usage
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_db)):
return await db.get(User, id)
Read-Only Sessions¶
from sqlalchemy.ext.asyncio import AsyncSession
async def get_db_readonly() -> AsyncGenerator[AsyncSession, None]:
"""Read-only session (no commit)."""
async with async_session() as session:
# Set transaction to read-only
await session.execute(text("SET TRANSACTION READ ONLY"))
yield session
# No commit needed for read-only
# Usage for read-heavy endpoints
@app.get("/reports/summary")
async def get_report(db: AsyncSession = Depends(get_db_readonly)):
return await generate_report(db)
Read Replicas¶
Configuration¶
# Primary for writes
write_engine = create_async_engine(
"postgresql+asyncpg://user:pass@primary/db",
pool_size=10,
)
# Replica for reads
read_engine = create_async_engine(
"postgresql+asyncpg://user:pass@replica/db",
pool_size=30, # More capacity for reads
)
write_session = async_sessionmaker(write_engine)
read_session = async_sessionmaker(read_engine)
Dependency Injection¶
async def get_write_db():
async with write_session() as session:
yield session
await session.commit()
async def get_read_db():
async with read_session() as session:
yield session
# Write operations
@app.post("/users")
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_write_db)):
...
# Read operations
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_read_db)):
...
Connection Monitoring¶
Active Connections¶
-- Current connections by application
SELECT
application_name,
state,
COUNT(*) as connections
FROM pg_stat_activity
WHERE datname = 'sartiq'
GROUP BY application_name, state
ORDER BY connections DESC;
-- Connection details
SELECT
pid,
usename,
application_name,
client_addr,
state,
query_start,
query
FROM pg_stat_activity
WHERE datname = 'sartiq'
AND state != 'idle';
Pool Health Check¶
from sqlalchemy import text
async def check_db_health() -> dict:
"""Health check for database connection pool."""
async with async_session() as session:
# Test query
await session.execute(text("SELECT 1"))
# Pool status
pool = engine.pool
return {
"status": "healthy",
"pool_size": pool.size(),
"checked_in": pool.checkedin(),
"checked_out": pool.checkedout(),
"overflow": pool.overflow(),
}
Logging Slow Queries¶
import logging
import time
from sqlalchemy import event
logger = logging.getLogger("sqlalchemy.engine")
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info["query_start_time"] = time.time()
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
elapsed = time.time() - conn.info["query_start_time"]
if elapsed > 0.1: # Log queries over 100ms
logger.warning(f"Slow query ({elapsed:.3f}s): {statement[:200]}")
PgBouncer¶
External connection pooler for high-scale deployments.
Why PgBouncer?¶
- Connection multiplexing — Many clients share fewer database connections
- Memory savings — PostgreSQL uses ~10MB per connection
- Connection limits — PostgreSQL has a hard connection limit
Architecture¶
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Worker1 │ │ Worker2 │ │ Worker3 │
│ pool=20 │ │ pool=20 │ │ pool=20 │
└────┬────┘ └────┬────┘ └────┬────┘
│ │ │
└─────────────┼─────────────┘
│ 60 connections
▼
┌─────────────┐
│ PgBouncer │
│ pool=30 │
└──────┬──────┘
│ 30 connections
▼
┌─────────────┐
│ PostgreSQL │
└─────────────┘
Pool Modes¶
| Mode | Description | Use Case |
|---|---|---|
| session | Connection per client session | Most compatible |
| transaction | Connection per transaction | Recommended |
| statement | Connection per statement | Limited compatibility |
Configuration¶
# pgbouncer.ini
[databases]
sartiq = host=localhost port=5432 dbname=sartiq
[pgbouncer]
listen_port = 6432
listen_addr = 0.0.0.0
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
pool_mode = transaction
default_pool_size = 30
max_client_conn = 1000
max_db_connections = 50
# Timeouts
server_idle_timeout = 300
query_timeout = 30
Application Configuration¶
# Point SQLAlchemy at PgBouncer
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:6432/sartiq"
engine = create_async_engine(
DATABASE_URL,
pool_size=5, # Small pool, PgBouncer does the pooling
max_overflow=0, # Let PgBouncer handle overflow
pool_pre_ping=True,
)
Troubleshooting¶
"Too Many Connections"¶
-- Check current connections
SELECT COUNT(*) FROM pg_stat_activity WHERE datname = 'sartiq';
-- Check max connections
SHOW max_connections;
-- Kill idle connections
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = 'sartiq'
AND state = 'idle'
AND query_start < NOW() - INTERVAL '10 minutes';
Fix:
1. Reduce application pool sizes
2. Add PgBouncer
3. Increase max_connections (requires restart)
Connection Leaks¶
# Bad: Connection never returned
async def leaky_function():
session = async_session()
result = await session.execute(query)
return result # Session never closed!
# Good: Always use context manager
async def safe_function():
async with async_session() as session:
result = await session.execute(query)
return result # Session closed automatically
Connection Timeout¶
from sqlalchemy.exc import TimeoutError
try:
async with async_session() as session:
await session.execute(query)
except TimeoutError:
# Pool exhausted, couldn't get connection in time
logger.error("Database pool exhausted")
raise HTTPException(503, "Service temporarily unavailable")
Best Practices¶
- Use connection pooling — Always, even for small apps
- Size pools appropriately — Too small = timeouts, too large = wasted resources
- Enable pool_pre_ping — Prevents stale connection errors
- Set pool_recycle — Refresh connections periodically
- Use read replicas — For read-heavy workloads
- Monitor pool metrics — Track checkout time and overflow
- Consider PgBouncer — For high-scale deployments
- Always use context managers — Prevent connection leaks
See Also¶
- Database Monitoring -- Tracking connection pool health, slow queries, and replication lag
- Performance & Caching -- Query optimization and caching strategies that rely on healthy connections