Backend API Architecture¶
The Backend API is the central orchestration layer of the Sartiq platform, handling all business logic, data management, and coordination between system components.
Overview¶
| Aspect | Details |
|---|---|
| Framework | FastAPI |
| Language | Python 3.12 |
| ORM | SQLModel + SQLAlchemy |
| Task Queue | Celery with Redis broker |
| Scheduler | APScheduler |
| Port | 8000 |
Architecture Diagram¶
flowchart TB
subgraph API["API Layer"]
Routes[FastAPI Routes]
WS[WebSocket Manager]
Auth[Auth Middleware]
end
subgraph Services["Service Layer"]
GenService[Generation Service]
ShootingService[Shooting Service]
ProductService[Product Service]
ExportService[Export Service]
ComputeClient[Compute Client]
end
subgraph Data["Data Layer"]
Models[SQLModel Models]
Repos[Repositories]
end
subgraph Background["Background Processing"]
Celery[Celery Workers]
Scheduler[APScheduler]
end
subgraph External["External"]
Compute[Compute Server]
DB[(PostgreSQL)]
Redis[(Redis)]
R2[(Cloudflare R2)]
end
Routes --> Auth
Auth --> Services
WS --> Redis
Services --> Repos
Repos --> Models
Models --> DB
Services --> Celery
Services --> ComputeClient
ComputeClient --> Compute
Celery --> Redis
Scheduler --> Services
Services --> R2
Layered Architecture¶
The backend follows a clean layered architecture with clear separation of concerns.
Layer Overview¶
| Layer | Location | Responsibility |
|---|---|---|
| Routes | app/api/routes/ |
HTTP endpoints, request/response handling |
| Services | app/services/ |
Business logic, orchestration |
| Models | app/models/ |
Database entities (SQLModel) |
| Schemas | app/schemas/ |
Request/response DTOs (Pydantic) |
| Repositories | app/repositories/ |
Data access patterns |
Dependency Flow¶
flowchart TB
Routes[Routes Layer]
Services[Service Layer]
Repos[Repository Layer]
Models[Model Layer]
DB[(Database)]
Routes --> Services
Services --> Repos
Repos --> Models
Models --> DB
Routes Layer¶
HTTP endpoints organized by domain.
Route Structure¶
app/api/routes/
├── __init__.py
├── auth.py # Authentication endpoints
├── organizations.py # Organization management
├── products.py # Product CRUD
├── subjects.py # Subject management
├── shootings.py # Shooting management
├── shooting_looks.py # Shooting look operations
├── generations.py # Generation control
├── gallery.py # Image gallery
├── exports.py # Export operations
├── uploads.py # Presigned URL generation, file uploads
├── files.py # File serving (302 redirect to CDN)
├── bulk_upload.py # CSV bulk import
├── styles.py # Style management
├── shots.py # Shot management
├── captions.py # Caption generation
└── admin/ # Admin-only routes
├── users.py
└── settings.py
Route Pattern Example¶
# app/api/routes/products.py
from fastapi import APIRouter, Depends, HTTPException
from app.services.product import ProductService
from app.schemas.product import ProductCreate, ProductRead
from app.api.deps import get_current_user, get_product_service
router = APIRouter(prefix="/products", tags=["products"])
@router.post("/", response_model=ProductRead)
async def create_product(
product_in: ProductCreate,
current_user: User = Depends(get_current_user),
service: ProductService = Depends(get_product_service),
):
"""Create a new product."""
return await service.create(product_in, organization_id=current_user.organization_id)
@router.get("/{product_id}", response_model=ProductRead)
async def get_product(
product_id: UUID,
current_user: User = Depends(get_current_user),
service: ProductService = Depends(get_product_service),
):
"""Get a product by ID."""
product = await service.get(product_id)
if not product or product.organization_id != current_user.organization_id:
raise HTTPException(status_code=404, detail="Product not found")
return product
Service Layer¶
Business logic encapsulated in service classes.
Key Services¶
| Service | Responsibility |
|---|---|
GenerationWorkflowService |
Orchestrates generation lifecycle |
ShootingService |
Shooting CRUD and status management |
ShootingLookService |
Look configuration and shot management |
ProductService |
Product catalog operations |
SubjectService |
Subject management |
ExportService |
Export to external systems |
MediaResourceService |
File ingestion, deduplication, deletion, orphan scanning |
ComputeServerClient |
Communication with Compute Server |
Service Pattern¶
# app/services/generation_workflow.py
class GenerationWorkflowService:
def __init__(
self,
db: AsyncSession,
compute_client: ComputeServerClient,
redis: Redis,
):
self.db = db
self.compute_client = compute_client
self.redis = redis
async def start_generation(
self,
shooting_look_id: UUID,
shot_type_id: UUID,
config: GenerationConfig,
) -> Generation:
"""Start a new generation workflow."""
# 1. Validate inputs
look = await self._get_look(shooting_look_id)
shot_type = await self._get_shot_type(shot_type_id)
# 2. Create generation record
generation = await self._create_generation(look, shot_type, config)
# 3. Submit to compute server
workflow_id = await self.compute_client.submit_workflow(
generation_id=generation.id,
product=look.main_product,
subject=look.subject,
style=look.style,
shot_type=shot_type,
config=config,
)
# 4. Update generation with workflow ID
generation.workflow_id = workflow_id
await self.db.commit()
# 5. Publish event
await self._publish_event("generation.started", generation)
return generation
Models Layer¶
Database entities using SQLModel (SQLAlchemy + Pydantic).
Model Structure¶
app/models/
├── __init__.py
├── base.py # Base model with common fields
├── organization.py
├── user.py
├── product.py
├── subject.py
├── shooting.py
├── shooting_look.py
├── shot.py
├── generation.py
├── prediction.py
├── style.py
├── guideline.py
├── shot_type.py
└── media_resource.py # MediaResource, MediaResourceAttachment, MediaDeletionDeadLetter
Model Example¶
# app/models/generation.py
from sqlmodel import SQLModel, Field, Relationship
from uuid import UUID, uuid4
from datetime import datetime
from enum import Enum
class GenerationStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
class Generation(SQLModel, table=True):
id: UUID = Field(default_factory=uuid4, primary_key=True)
shot_id: UUID = Field(foreign_key="shot.id")
workflow_id: str | None = None
status: GenerationStatus = GenerationStatus.PENDING
config: dict = Field(default_factory=dict, sa_column_kwargs={"type_": JSON})
error_message: str | None = None
created_at: datetime = Field(default_factory=datetime.utcnow)
completed_at: datetime | None = None
# Relationships
shot: "Shot" = Relationship(back_populates="generations")
predictions: list["Prediction"] = Relationship(back_populates="generation")
Schemas Layer¶
Request/response data transfer objects.
Schema Organization¶
app/schemas/
├── __init__.py
├── base.py # Common schemas
├── product.py # ProductCreate, ProductRead, ProductUpdate
├── shooting.py
├── generation.py
└── pagination.py # Paginated response schemas
Schema Pattern¶
# app/schemas/product.py
from pydantic import BaseModel
from uuid import UUID
from datetime import datetime
class ProductBase(BaseModel):
name: str
sku: str | None = None
category: str | None = None
metadata: dict = {}
class ProductCreate(ProductBase):
"""Schema for creating a product."""
pass
class ProductUpdate(BaseModel):
"""Schema for updating a product (all fields optional)."""
name: str | None = None
sku: str | None = None
category: str | None = None
metadata: dict | None = None
class ProductRead(ProductBase):
"""Schema for reading a product."""
id: UUID
organization_id: UUID
primary_image_url: str | None
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
Background Processing¶
Celery Workers¶
Asynchronous task processing for long-running operations.
flowchart LR
subgraph API
Routes[API Routes]
end
subgraph Queue["Redis Queue"]
Q1[default]
Q2[high-priority]
Q3[low-priority]
end
subgraph Workers["Celery Workers"]
W1[Worker 1]
W2[Worker 2]
W3[Worker 3]
end
Routes -->|enqueue| Q1
Routes -->|enqueue| Q2
Q1 --> W1
Q1 --> W2
Q2 --> W1
Q3 --> W3
Task Types¶
| Queue | Tasks |
|---|---|
high-priority |
Event processing, real-time updates |
default |
Standard background operations |
low-priority |
Batch exports, cleanup tasks |
Task Example¶
# app/tasks/generations.py
from celery import shared_task
from app.services.generation_workflow import GenerationWorkflowService
@shared_task(bind=True, max_retries=3)
def process_generation_complete(self, generation_id: str, result: dict):
"""Process generation completion from compute server."""
try:
service = GenerationWorkflowService(...)
service.handle_completion(generation_id, result)
except Exception as exc:
self.retry(exc=exc, countdown=60)
APScheduler¶
Periodic tasks and scheduled jobs.
# app/scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job('interval', hours=1)
async def cleanup_expired_sessions():
"""Remove expired user sessions."""
...
@scheduler.scheduled_job('cron', hour=2)
async def daily_usage_report():
"""Generate daily usage reports."""
...
Real-Time Updates¶
WebSocket + Redis Pub/Sub¶
sequenceDiagram
participant Client
participant WS as WebSocket
participant Backend
participant Redis
participant Compute
Client->>WS: Connect
WS->>Backend: Authenticate
Backend->>Redis: Subscribe to user channel
Note over Compute: Generation completes
Compute->>Redis: Emit task.completed
Redis->>Backend: Event received
Backend->>Compute: GET /tasks/{id}/result
Compute-->>Backend: Task result
Backend->>Redis: Publish to user channel
Redis->>Backend: Event received
Backend->>WS: Send to client
WS->>Client: generation.completed
WebSocket Manager¶
# app/websocket/manager.py
class WebSocketManager:
def __init__(self, redis: Redis):
self.redis = redis
self.connections: dict[str, WebSocket] = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.connections[user_id] = websocket
# Subscribe to user's channel
await self.redis.subscribe(f"user:{user_id}")
async def broadcast_to_user(self, user_id: str, event: dict):
if ws := self.connections.get(user_id):
await ws.send_json(event)
async def handle_redis_messages(self):
"""Listen for Redis pub/sub messages and forward to WebSocket."""
async for message in self.redis.listen():
user_id = message["channel"].split(":")[1]
await self.broadcast_to_user(user_id, message["data"])
External Integrations¶
Compute Server Client¶
Communicates with the Compute Server for AI operations.
# app/services/compute_client.py
class ComputeServerClient:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.client = httpx.AsyncClient()
async def submit_task(
self,
generation_id: UUID,
product: Product,
subject: Subject,
style: Style,
shot_type: ShotType,
config: GenerationConfig,
) -> str:
"""Submit a generation task to the compute server."""
response = await self.client.post(
f"{self.base_url}/api/v1/tasks",
json={
"generation_id": str(generation_id),
"tasks": self._build_task_graph(product, subject, style, shot_type, config),
},
headers={"Authorization": f"Bearer {self.api_key}"},
)
response.raise_for_status()
return response.json()["task_id"]
async def get_task_result(self, task_id: str) -> TaskResult:
"""Fetch task result from compute server."""
response = await self.client.get(
f"{self.base_url}/api/v1/tasks/{task_id}/result",
headers={"Authorization": f"Bearer {self.api_key}"},
)
response.raise_for_status()
return TaskResult(**response.json())
Redis Event Listener¶
Listens for task lifecycle events from Compute Server via Redis event stream.
# app/services/event_listener.py
class ComputeEventListener:
def __init__(self, redis: Redis, compute_client: ComputeServerClient):
self.redis = redis
self.compute_client = compute_client
async def start(self):
"""Subscribe to compute server events."""
async for event in self.redis.listen("compute:events"):
await self.handle_event(event)
async def handle_event(self, event: dict):
"""Handle task lifecycle events."""
if event["type"] == "task.completed":
result = await self.compute_client.get_task_result(event["task_id"])
await self.handle_completion(event["generation_id"], result)
elif event["type"] == "task.failed":
await self.handle_failure(event["generation_id"], event["error"])
Database Schema Overview¶
Core Tables¶
erDiagram
users }o--o{ organizations : belongs_to
organizations ||--o{ products : owns
organizations ||--o{ shootings : owns
organizations ||--o{ subjects : owns
organizations ||--o{ guidelines : owns
shootings ||--o{ shooting_looks : contains
shooting_looks }o--|| subjects : features
shooting_looks }o--|| products : main
shooting_looks }o--o{ guidelines : uses
shooting_looks ||--o{ shots : produces
shots ||--o{ generations : triggers
generations ||--o{ predictions : produces
media_resource ||--o{ media_resource_attachment : "has many"
media_resource_attachment }o--|| products : "attaches to"
media_resource_attachment }o--|| predictions : "attaches to"
media_resource_attachment }o--|| subjects : "attaches to"
Note:
MediaResourceAttachmentuses a polymorphicentity_type+entity_idpattern rather than direct foreign keys, allowing it to attach to any entity. See MediaResource Lifecycle for the full data model.
Key Indexes¶
| Table | Index | Purpose |
|---|---|---|
products |
organization_id |
Filter products by organization |
shootings |
organization_id, status |
List shootings by status |
generations |
status, created_at |
Queue processing |
predictions |
generation_id |
Fetch predictions |
Configuration¶
Environment Variables¶
| Variable | Description |
|---|---|
DATABASE_URL |
PostgreSQL connection string |
REDIS_URL |
Redis connection string |
COMPUTE_SERVER_API_URL |
Compute Server base URL |
COMPUTE_SERVER_ADMIN_TOKEN |
Compute Server admin token |
R2_ACCOUNT_ID |
Cloudflare R2 account ID |
R2_ACCESS_KEY_ID |
R2 S3 access key ID |
R2_SECRET_ACCESS_KEY |
R2 S3 secret access key |
R2_BUCKET_NAME |
R2 bucket name |
R2_PUBLIC_URL |
Public CDN base URL |
R2_ENDPOINT_URL |
S3 endpoint URL (MinIO in dev) |
CLOUDFLARE_IMAGE_TRANSFORMATIONS_ENABLED |
Enable CDN-based format conversion |
JWT_SECRET |
JWT signing secret |
Key Directories¶
| Directory | Purpose |
|---|---|
app/api/ |
API routes and dependencies |
app/services/ |
Business logic services |
app/services/storage/ |
Storage provider (R2 client, presigned URLs, file operations) |
app/models/ |
SQLModel database models |
app/schemas/ |
Pydantic request/response schemas |
app/tasks/ |
Celery background tasks |
app/services/media_resource_service.py |
Central media orchestrator (ingestion, dedup, deletion) |
app/crud/media_resource.py |
MediaResource CRUD, dead-letter, orphan queries |
app/schemas/media_resource.py |
MediaInput, MediaResourcePublic, MediaSlot annotations |
app/utils/media_introspection.py |
Schema auto-discovery for media fields |
app/utils/image_utils/ |
Image processing utilities (persistence, format conversion) |
app/websocket/ |
WebSocket management |
app/config.py |
Application configuration |
app/main.py |
FastAPI application factory |
Related Documentation¶
- Compute Server - AI orchestration layer
- Backend Development - Coding standards
- API Reference - Endpoint documentation
- Database Patterns - Database best practices