Design Patterns & Best Practices¶
Abstract Base Classes + Mixins Pattern¶
Use this pattern to separate contracts from utilities:
- Abstract Base Class: Defines the required interface (what must be implemented)
- Mixin Class: Provides optional utility methods (what can be reused)
- Benefits: Interface segregation, optional adoption, better testability
from abc import ABC, abstractmethod
# Contract definition
class BaseProcessor(ABC):
@abstractmethod
async def process(self, request): pass
# Optional utilities
class ProcessorMixin:
def _create_success_response(self, data):
return {"success": True, "data": data}
def _create_error_response(self, error):
return {"success": False, "error": str(error)}
# Implementation can choose utilities
class MyProcessor(BaseProcessor, ProcessorMixin):
async def process(self, request):
result = await self._do_processing(request)
return self._create_success_response(result) # Uses mixin
Factory Pattern¶
Use factories for object creation with configuration validation:
from fastapi import Depends
class NotificationSender(ABC):
@abstractmethod
async def send(self, recipient: str, message: str) -> None: ...
class EmailSender(NotificationSender):
async def send(self, recipient: str, message: str) -> None:
await self.smtp_client.send(to=recipient, body=message)
class SmsSender(NotificationSender):
async def send(self, recipient: str, message: str) -> None:
await self.sms_client.send(to=recipient, body=message)
_senders: dict[str, type[NotificationSender]] = {
"email": EmailSender,
"sms": SmsSender,
}
def get_notification_sender(channel: str) -> NotificationSender:
sender_cls = _senders.get(channel)
if not sender_cls:
raise ValueError(f"Unknown notification channel: {channel}")
return sender_cls()
Async-First Design¶
All I/O operations must be async. Use proper async patterns throughout:
import asyncio
from contextlib import asynccontextmanager
# Async context manager for resource cleanup
@asynccontextmanager
async def managed_connection(url: str):
conn = await create_connection(url)
try:
yield conn
finally:
await conn.close()
# Parallel I/O with asyncio.gather
async def enrich_user(user: User) -> EnrichedUser:
profile, preferences, recent_activity = await asyncio.gather(
profile_service.get(user.id),
preferences_service.get(user.id),
activity_service.get_recent(user.id),
)
return EnrichedUser(user=user, profile=profile, preferences=preferences, activity=recent_activity)
Rules:
- Prefer async libraries (aiohttp over requests, asyncpg over psycopg2)
- Design for concurrency from the start
- Use
asyncio.gatherfor independent I/O operations - Never call blocking I/O inside an async function without
run_in_executor
Validation Patterns¶
- Construction-time validation: Validate immutable properties in
__post_init__ - Runtime validation: Validate mutable state before processing
- Input validation: Validate external inputs at system boundaries
- Fail-fast principle: Catch errors as early as possible
Layered Architecture¶
All backend code follows a strict Route → Service → Repository → DB layered architecture:
Route / Event Handler / Script / Background Task
│
▼
Service Layer ← business logic, orchestration, domain events
│
▼
Repository Layer ← data access (CRUD base + domain queries)
│
▼
Database (SQLAlchemy models)
Layer Responsibilities¶
| Layer | Does | Does NOT |
|---|---|---|
| Route/Handler | HTTP parsing, auth, validation, response formatting | Business logic, direct DB access |
| Service | Business rules, orchestration, event publishing | HTTP concerns, SQL queries |
| Repository | Data access, query construction, CRUD + domain queries | Business logic, HTTP concerns |
| Models | Schema definition, relationships, constraints | Query logic, business rules |
Strict Rules¶
- All callers (routes, events, scripts, tasks) access data through services — no exceptions, even for trivial reads
- Only services call repositories — never routes or event handlers directly
- Services can call other services for synchronous orchestration and reads
- No circular service dependencies — if A calls B, B must not call A (use events instead)
- Side effects (emails, analytics, notifications) go through domain events, not direct cross-service mutations
- Repositories never contain business logic — only data access and query construction
Repository Pattern¶
Use a generic base repository for CRUD operations and extend with domain-specific queries:
class BaseRepository(Generic[T]):
async def create(self, data: dict) -> T: ...
async def get(self, id: UUID) -> T | None: ...
async def get_multi(self, filters: dict) -> list[T]: ...
async def update(self, id: UUID, data: dict) -> T: ...
async def delete(self, id: UUID) -> None: ...
class UserRepository(BaseRepository[User]):
async def get_active_by_organization(self, org_id: UUID) -> list[User]: ...
async def exists_by_email(self, email: str) -> bool: ...
Service-to-Service Communication¶
| Scenario | Pattern | Example |
|---|---|---|
| Synchronous orchestration | Direct service call | Order creation checks inventory |
| Cross-domain reads | Direct service call | Order service reads user info |
| Async side effects | Domain events | User created → send welcome email |
| Cross-domain state changes | Domain events | Payment confirmed → update inventory |
class UserService:
async def create_user(self, data: CreateUserInput) -> User:
user = await self.user_repo.create(data.dict())
await self.event_bus.publish("user.created", {"user_id": user.id})
return user
Anti-Patterns¶
- Route calling repository directly — skips service layer, scatters business logic
- Business logic in repository — validation, orchestration, or conditional logic belongs in services
- Circular service dependencies — if two services need each other, use domain events to break the cycle
- Domain events for synchronous workflows — events are fire-and-forget; use direct service calls when you need return values
See Also¶
- Dependency Injection -- How services and repositories are wired via FastAPI's Depends
- Database Patterns -- SQLAlchemy async patterns used by the repository layer