Skip to content

Connection Management

Optimize database connections for performance and reliability.

Connection Pooling Basics

Why Pooling?

Creating database connections is expensive: - TCP handshake - SSL negotiation - Authentication - Memory allocation

Connection pools maintain reusable connections.

Without pooling:
Request → Create Connection → Query → Close Connection → Response
                ↑ Expensive

With pooling:
Request → Get Connection from Pool → Query → Return to Pool → Response
                ↑ Fast

SQLAlchemy Async Configuration

Basic Setup

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/dbname"

engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,           # Connections to keep open
    max_overflow=10,        # Extra connections when pool is full
    pool_timeout=30,        # Seconds to wait for a connection
    pool_recycle=1800,      # Recycle connections after 30 min
    pool_pre_ping=True,     # Verify connection before use
    echo=False,             # Log SQL (True for debugging)
)

async_session = async_sessionmaker(
    engine,
    expire_on_commit=False,
    class_=AsyncSession
)

Pool Size Guidelines

Scenario pool_size max_overflow Total Max
Development 5 5 10
Small app 10 10 20
Medium app 20 10 30
High traffic 50+ 20 70+

Formula: total_connections = workers × pool_size_per_worker

For 4 Uvicorn workers with pool_size=20: 80 connections max.

Connection Recycling

Prevent stale connections:

engine = create_async_engine(
    DATABASE_URL,
    pool_recycle=1800,   # 30 minutes
    pool_pre_ping=True,  # Check connection before use
)

pool_pre_ping: Issues SELECT 1 before returning connection. Small overhead, prevents "connection closed" errors.

Asyncpg-Specific Tuning

Statement Caching

# Increase prepared statement cache (default: 100)
engine = create_async_engine(
    DATABASE_URL,
    connect_args={
        "statement_cache_size": 500,
    }
)

Connection Parameters

engine = create_async_engine(
    DATABASE_URL,
    connect_args={
        "statement_cache_size": 500,
        "command_timeout": 60,        # Query timeout in seconds
        "server_settings": {
            "application_name": "sartiq-api",
            "jit": "off",             # Disable JIT for short queries
        }
    }
)

FastAPI Dependency Pattern

Session Per Request

from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """Yield a database session for request."""
    async with async_session() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# Usage
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_db)):
    return await db.get(User, id)

Read-Only Sessions

from sqlalchemy.ext.asyncio import AsyncSession

async def get_db_readonly() -> AsyncGenerator[AsyncSession, None]:
    """Read-only session (no commit)."""
    async with async_session() as session:
        # Set transaction to read-only
        await session.execute(text("SET TRANSACTION READ ONLY"))
        yield session
        # No commit needed for read-only

# Usage for read-heavy endpoints
@app.get("/reports/summary")
async def get_report(db: AsyncSession = Depends(get_db_readonly)):
    return await generate_report(db)

Read Replicas

Configuration

# Primary for writes
write_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@primary/db",
    pool_size=10,
)

# Replica for reads
read_engine = create_async_engine(
    "postgresql+asyncpg://user:pass@replica/db",
    pool_size=30,  # More capacity for reads
)

write_session = async_sessionmaker(write_engine)
read_session = async_sessionmaker(read_engine)

Dependency Injection

async def get_write_db():
    async with write_session() as session:
        yield session
        await session.commit()

async def get_read_db():
    async with read_session() as session:
        yield session

# Write operations
@app.post("/users")
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_write_db)):
    ...

# Read operations
@app.get("/users")
async def list_users(db: AsyncSession = Depends(get_read_db)):
    ...

Connection Monitoring

Active Connections

-- Current connections by application
SELECT
    application_name,
    state,
    COUNT(*) as connections
FROM pg_stat_activity
WHERE datname = 'sartiq'
GROUP BY application_name, state
ORDER BY connections DESC;

-- Connection details
SELECT
    pid,
    usename,
    application_name,
    client_addr,
    state,
    query_start,
    query
FROM pg_stat_activity
WHERE datname = 'sartiq'
  AND state != 'idle';

Pool Health Check

from sqlalchemy import text

async def check_db_health() -> dict:
    """Health check for database connection pool."""
    async with async_session() as session:
        # Test query
        await session.execute(text("SELECT 1"))

    # Pool status
    pool = engine.pool
    return {
        "status": "healthy",
        "pool_size": pool.size(),
        "checked_in": pool.checkedin(),
        "checked_out": pool.checkedout(),
        "overflow": pool.overflow(),
    }

Logging Slow Queries

import logging
import time
from sqlalchemy import event

logger = logging.getLogger("sqlalchemy.engine")

@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info["query_start_time"] = time.time()

@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    elapsed = time.time() - conn.info["query_start_time"]
    if elapsed > 0.1:  # Log queries over 100ms
        logger.warning(f"Slow query ({elapsed:.3f}s): {statement[:200]}")

PgBouncer

External connection pooler for high-scale deployments.

Why PgBouncer?

  • Connection multiplexing — Many clients share fewer database connections
  • Memory savings — PostgreSQL uses ~10MB per connection
  • Connection limits — PostgreSQL has a hard connection limit

Architecture

┌─────────┐   ┌─────────┐   ┌─────────┐
│ Worker1 │   │ Worker2 │   │ Worker3 │
│ pool=20 │   │ pool=20 │   │ pool=20 │
└────┬────┘   └────┬────┘   └────┬────┘
     │             │             │
     └─────────────┼─────────────┘
                   │ 60 connections
            ┌─────────────┐
            │  PgBouncer  │
            │  pool=30    │
            └──────┬──────┘
                   │ 30 connections
            ┌─────────────┐
            │ PostgreSQL  │
            └─────────────┘

Pool Modes

Mode Description Use Case
session Connection per client session Most compatible
transaction Connection per transaction Recommended
statement Connection per statement Limited compatibility

Configuration

# pgbouncer.ini
[databases]
sartiq = host=localhost port=5432 dbname=sartiq

[pgbouncer]
listen_port = 6432
listen_addr = 0.0.0.0
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt

pool_mode = transaction
default_pool_size = 30
max_client_conn = 1000
max_db_connections = 50

# Timeouts
server_idle_timeout = 300
query_timeout = 30

Application Configuration

# Point SQLAlchemy at PgBouncer
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost:6432/sartiq"

engine = create_async_engine(
    DATABASE_URL,
    pool_size=5,      # Small pool, PgBouncer does the pooling
    max_overflow=0,   # Let PgBouncer handle overflow
    pool_pre_ping=True,
)

Troubleshooting

"Too Many Connections"

-- Check current connections
SELECT COUNT(*) FROM pg_stat_activity WHERE datname = 'sartiq';

-- Check max connections
SHOW max_connections;

-- Kill idle connections
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = 'sartiq'
  AND state = 'idle'
  AND query_start < NOW() - INTERVAL '10 minutes';

Fix: 1. Reduce application pool sizes 2. Add PgBouncer 3. Increase max_connections (requires restart)

Connection Leaks

# Bad: Connection never returned
async def leaky_function():
    session = async_session()
    result = await session.execute(query)
    return result  # Session never closed!

# Good: Always use context manager
async def safe_function():
    async with async_session() as session:
        result = await session.execute(query)
        return result  # Session closed automatically

Connection Timeout

from sqlalchemy.exc import TimeoutError

try:
    async with async_session() as session:
        await session.execute(query)
except TimeoutError:
    # Pool exhausted, couldn't get connection in time
    logger.error("Database pool exhausted")
    raise HTTPException(503, "Service temporarily unavailable")

Best Practices

  1. Use connection pooling — Always, even for small apps
  2. Size pools appropriately — Too small = timeouts, too large = wasted resources
  3. Enable pool_pre_ping — Prevents stale connection errors
  4. Set pool_recycle — Refresh connections periodically
  5. Use read replicas — For read-heavy workloads
  6. Monitor pool metrics — Track checkout time and overflow
  7. Consider PgBouncer — For high-scale deployments
  8. Always use context managers — Prevent connection leaks

See Also