Skip to content

Backend API Architecture

The Backend API is the central orchestration layer of the Sartiq platform, handling all business logic, data management, and coordination between system components.


Overview

Aspect Details
Framework FastAPI
Language Python 3.12
ORM SQLModel + SQLAlchemy
Task Queue Celery with Redis broker
Scheduler APScheduler
Port 8000

Architecture Diagram

flowchart TB
    subgraph API["API Layer"]
        Routes[FastAPI Routes]
        WS[WebSocket Manager]
        Auth[Auth Middleware]
    end

    subgraph Services["Service Layer"]
        GenService[Generation Service]
        ShootingService[Shooting Service]
        ProductService[Product Service]
        ExportService[Export Service]
        ComputeClient[Compute Client]
    end

    subgraph Data["Data Layer"]
        Models[SQLModel Models]
        Repos[Repositories]
    end

    subgraph Background["Background Processing"]
        Celery[Celery Workers]
        Scheduler[APScheduler]
    end

    subgraph External["External"]
        Compute[Compute Server]
        DB[(PostgreSQL)]
        Redis[(Redis)]
        R2[(Cloudflare R2)]
    end

    Routes --> Auth
    Auth --> Services
    WS --> Redis
    Services --> Repos
    Repos --> Models
    Models --> DB
    Services --> Celery
    Services --> ComputeClient
    ComputeClient --> Compute
    Celery --> Redis
    Scheduler --> Services
    Services --> R2

Layered Architecture

The backend follows a clean layered architecture with clear separation of concerns.

Layer Overview

Layer Location Responsibility
Routes app/api/routes/ HTTP endpoints, request/response handling
Services app/services/ Business logic, orchestration
Models app/models/ Database entities (SQLModel)
Schemas app/schemas/ Request/response DTOs (Pydantic)
Repositories app/repositories/ Data access patterns

Dependency Flow

flowchart TB
    Routes[Routes Layer]
    Services[Service Layer]
    Repos[Repository Layer]
    Models[Model Layer]
    DB[(Database)]

    Routes --> Services
    Services --> Repos
    Repos --> Models
    Models --> DB

Routes Layer

HTTP endpoints organized by domain.

Route Structure

app/api/routes/
├── __init__.py
├── auth.py              # Authentication endpoints
├── organizations.py     # Organization management
├── products.py          # Product CRUD
├── subjects.py          # Subject management
├── shootings.py         # Shooting management
├── shooting_looks.py    # Shooting look operations
├── generations.py       # Generation control
├── gallery.py           # Image gallery
├── exports.py           # Export operations
├── uploads.py           # Presigned URL generation, file uploads
├── files.py             # File serving (302 redirect to CDN)
├── bulk_upload.py       # CSV bulk import
├── styles.py            # Style management
├── shots.py             # Shot management
├── captions.py          # Caption generation
└── admin/               # Admin-only routes
    ├── users.py
    └── settings.py

Route Pattern Example

# app/api/routes/products.py
from fastapi import APIRouter, Depends, HTTPException
from app.services.product import ProductService
from app.schemas.product import ProductCreate, ProductRead
from app.api.deps import get_current_user, get_product_service

router = APIRouter(prefix="/products", tags=["products"])

@router.post("/", response_model=ProductRead)
async def create_product(
    product_in: ProductCreate,
    current_user: User = Depends(get_current_user),
    service: ProductService = Depends(get_product_service),
):
    """Create a new product."""
    return await service.create(product_in, organization_id=current_user.organization_id)

@router.get("/{product_id}", response_model=ProductRead)
async def get_product(
    product_id: UUID,
    current_user: User = Depends(get_current_user),
    service: ProductService = Depends(get_product_service),
):
    """Get a product by ID."""
    product = await service.get(product_id)
    if not product or product.organization_id != current_user.organization_id:
        raise HTTPException(status_code=404, detail="Product not found")
    return product

Service Layer

Business logic encapsulated in service classes.

Key Services

Service Responsibility
GenerationWorkflowService Orchestrates generation lifecycle
ShootingService Shooting CRUD and status management
ShootingLookService Look configuration and shot management
ProductService Product catalog operations
SubjectService Subject management
ExportService Export to external systems
MediaResourceService File ingestion, deduplication, deletion, orphan scanning
ComputeServerClient Communication with Compute Server

Service Pattern

# app/services/generation_workflow.py
class GenerationWorkflowService:
    def __init__(
        self,
        db: AsyncSession,
        compute_client: ComputeServerClient,
        redis: Redis,
    ):
        self.db = db
        self.compute_client = compute_client
        self.redis = redis

    async def start_generation(
        self,
        shooting_look_id: UUID,
        shot_type_id: UUID,
        config: GenerationConfig,
    ) -> Generation:
        """Start a new generation workflow."""
        # 1. Validate inputs
        look = await self._get_look(shooting_look_id)
        shot_type = await self._get_shot_type(shot_type_id)

        # 2. Create generation record
        generation = await self._create_generation(look, shot_type, config)

        # 3. Submit to compute server
        workflow_id = await self.compute_client.submit_workflow(
            generation_id=generation.id,
            product=look.main_product,
            subject=look.subject,
            style=look.style,
            shot_type=shot_type,
            config=config,
        )

        # 4. Update generation with workflow ID
        generation.workflow_id = workflow_id
        await self.db.commit()

        # 5. Publish event
        await self._publish_event("generation.started", generation)

        return generation

Models Layer

Database entities using SQLModel (SQLAlchemy + Pydantic).

Model Structure

app/models/
├── __init__.py
├── base.py              # Base model with common fields
├── organization.py
├── user.py
├── product.py
├── subject.py
├── shooting.py
├── shooting_look.py
├── shot.py
├── generation.py
├── prediction.py
├── style.py
├── guideline.py
├── shot_type.py
└── media_resource.py    # MediaResource, MediaResourceAttachment, MediaDeletionDeadLetter

Model Example

# app/models/generation.py
from sqlmodel import SQLModel, Field, Relationship
from uuid import UUID, uuid4
from datetime import datetime
from enum import Enum

class GenerationStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

class Generation(SQLModel, table=True):
    id: UUID = Field(default_factory=uuid4, primary_key=True)
    shot_id: UUID = Field(foreign_key="shot.id")
    workflow_id: str | None = None
    status: GenerationStatus = GenerationStatus.PENDING
    config: dict = Field(default_factory=dict, sa_column_kwargs={"type_": JSON})
    error_message: str | None = None
    created_at: datetime = Field(default_factory=datetime.utcnow)
    completed_at: datetime | None = None

    # Relationships
    shot: "Shot" = Relationship(back_populates="generations")
    predictions: list["Prediction"] = Relationship(back_populates="generation")

Schemas Layer

Request/response data transfer objects.

Schema Organization

app/schemas/
├── __init__.py
├── base.py              # Common schemas
├── product.py           # ProductCreate, ProductRead, ProductUpdate
├── shooting.py
├── generation.py
└── pagination.py        # Paginated response schemas

Schema Pattern

# app/schemas/product.py
from pydantic import BaseModel
from uuid import UUID
from datetime import datetime

class ProductBase(BaseModel):
    name: str
    sku: str | None = None
    category: str | None = None
    metadata: dict = {}

class ProductCreate(ProductBase):
    """Schema for creating a product."""
    pass

class ProductUpdate(BaseModel):
    """Schema for updating a product (all fields optional)."""
    name: str | None = None
    sku: str | None = None
    category: str | None = None
    metadata: dict | None = None

class ProductRead(ProductBase):
    """Schema for reading a product."""
    id: UUID
    organization_id: UUID
    primary_image_url: str | None
    created_at: datetime
    updated_at: datetime

    class Config:
        from_attributes = True

Background Processing

Celery Workers

Asynchronous task processing for long-running operations.

flowchart LR
    subgraph API
        Routes[API Routes]
    end

    subgraph Queue["Redis Queue"]
        Q1[default]
        Q2[high-priority]
        Q3[low-priority]
    end

    subgraph Workers["Celery Workers"]
        W1[Worker 1]
        W2[Worker 2]
        W3[Worker 3]
    end

    Routes -->|enqueue| Q1
    Routes -->|enqueue| Q2
    Q1 --> W1
    Q1 --> W2
    Q2 --> W1
    Q3 --> W3

Task Types

Queue Tasks
high-priority Event processing, real-time updates
default Standard background operations
low-priority Batch exports, cleanup tasks

Task Example

# app/tasks/generations.py
from celery import shared_task
from app.services.generation_workflow import GenerationWorkflowService

@shared_task(bind=True, max_retries=3)
def process_generation_complete(self, generation_id: str, result: dict):
    """Process generation completion from compute server."""
    try:
        service = GenerationWorkflowService(...)
        service.handle_completion(generation_id, result)
    except Exception as exc:
        self.retry(exc=exc, countdown=60)

APScheduler

Periodic tasks and scheduled jobs.

# app/scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler

scheduler = AsyncIOScheduler()

@scheduler.scheduled_job('interval', hours=1)
async def cleanup_expired_sessions():
    """Remove expired user sessions."""
    ...

@scheduler.scheduled_job('cron', hour=2)
async def daily_usage_report():
    """Generate daily usage reports."""
    ...

Real-Time Updates

WebSocket + Redis Pub/Sub

sequenceDiagram
    participant Client
    participant WS as WebSocket
    participant Backend
    participant Redis
    participant Compute

    Client->>WS: Connect
    WS->>Backend: Authenticate
    Backend->>Redis: Subscribe to user channel

    Note over Compute: Generation completes
    Compute->>Redis: Emit task.completed
    Redis->>Backend: Event received
    Backend->>Compute: GET /tasks/{id}/result
    Compute-->>Backend: Task result
    Backend->>Redis: Publish to user channel
    Redis->>Backend: Event received
    Backend->>WS: Send to client
    WS->>Client: generation.completed

WebSocket Manager

# app/websocket/manager.py
class WebSocketManager:
    def __init__(self, redis: Redis):
        self.redis = redis
        self.connections: dict[str, WebSocket] = {}

    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.connections[user_id] = websocket
        # Subscribe to user's channel
        await self.redis.subscribe(f"user:{user_id}")

    async def broadcast_to_user(self, user_id: str, event: dict):
        if ws := self.connections.get(user_id):
            await ws.send_json(event)

    async def handle_redis_messages(self):
        """Listen for Redis pub/sub messages and forward to WebSocket."""
        async for message in self.redis.listen():
            user_id = message["channel"].split(":")[1]
            await self.broadcast_to_user(user_id, message["data"])

External Integrations

Compute Server Client

Communicates with the Compute Server for AI operations.

# app/services/compute_client.py
class ComputeServerClient:
    def __init__(self, base_url: str, api_key: str):
        self.base_url = base_url
        self.api_key = api_key
        self.client = httpx.AsyncClient()

    async def submit_task(
        self,
        generation_id: UUID,
        product: Product,
        subject: Subject,
        style: Style,
        shot_type: ShotType,
        config: GenerationConfig,
    ) -> str:
        """Submit a generation task to the compute server."""
        response = await self.client.post(
            f"{self.base_url}/api/v1/tasks",
            json={
                "generation_id": str(generation_id),
                "tasks": self._build_task_graph(product, subject, style, shot_type, config),
            },
            headers={"Authorization": f"Bearer {self.api_key}"},
        )
        response.raise_for_status()
        return response.json()["task_id"]

    async def get_task_result(self, task_id: str) -> TaskResult:
        """Fetch task result from compute server."""
        response = await self.client.get(
            f"{self.base_url}/api/v1/tasks/{task_id}/result",
            headers={"Authorization": f"Bearer {self.api_key}"},
        )
        response.raise_for_status()
        return TaskResult(**response.json())

Redis Event Listener

Listens for task lifecycle events from Compute Server via Redis event stream.

# app/services/event_listener.py
class ComputeEventListener:
    def __init__(self, redis: Redis, compute_client: ComputeServerClient):
        self.redis = redis
        self.compute_client = compute_client

    async def start(self):
        """Subscribe to compute server events."""
        async for event in self.redis.listen("compute:events"):
            await self.handle_event(event)

    async def handle_event(self, event: dict):
        """Handle task lifecycle events."""
        if event["type"] == "task.completed":
            result = await self.compute_client.get_task_result(event["task_id"])
            await self.handle_completion(event["generation_id"], result)
        elif event["type"] == "task.failed":
            await self.handle_failure(event["generation_id"], event["error"])

Database Schema Overview

Core Tables

erDiagram
    users }o--o{ organizations : belongs_to
    organizations ||--o{ products : owns
    organizations ||--o{ shootings : owns
    organizations ||--o{ subjects : owns
    organizations ||--o{ guidelines : owns

    shootings ||--o{ shooting_looks : contains
    shooting_looks }o--|| subjects : features
    shooting_looks }o--|| products : main
    shooting_looks }o--o{ guidelines : uses

    shooting_looks ||--o{ shots : produces
    shots ||--o{ generations : triggers
    generations ||--o{ predictions : produces

    media_resource ||--o{ media_resource_attachment : "has many"
    media_resource_attachment }o--|| products : "attaches to"
    media_resource_attachment }o--|| predictions : "attaches to"
    media_resource_attachment }o--|| subjects : "attaches to"

Note: MediaResourceAttachment uses a polymorphic entity_type + entity_id pattern rather than direct foreign keys, allowing it to attach to any entity. See MediaResource Lifecycle for the full data model.

Key Indexes

Table Index Purpose
products organization_id Filter products by organization
shootings organization_id, status List shootings by status
generations status, created_at Queue processing
predictions generation_id Fetch predictions

Configuration

Environment Variables

Variable Description
DATABASE_URL PostgreSQL connection string
REDIS_URL Redis connection string
COMPUTE_SERVER_API_URL Compute Server base URL
COMPUTE_SERVER_ADMIN_TOKEN Compute Server admin token
R2_ACCOUNT_ID Cloudflare R2 account ID
R2_ACCESS_KEY_ID R2 S3 access key ID
R2_SECRET_ACCESS_KEY R2 S3 secret access key
R2_BUCKET_NAME R2 bucket name
R2_PUBLIC_URL Public CDN base URL
R2_ENDPOINT_URL S3 endpoint URL (MinIO in dev)
CLOUDFLARE_IMAGE_TRANSFORMATIONS_ENABLED Enable CDN-based format conversion
JWT_SECRET JWT signing secret

Key Directories

Directory Purpose
app/api/ API routes and dependencies
app/services/ Business logic services
app/services/storage/ Storage provider (R2 client, presigned URLs, file operations)
app/models/ SQLModel database models
app/schemas/ Pydantic request/response schemas
app/tasks/ Celery background tasks
app/services/media_resource_service.py Central media orchestrator (ingestion, dedup, deletion)
app/crud/media_resource.py MediaResource CRUD, dead-letter, orphan queries
app/schemas/media_resource.py MediaInput, MediaResourcePublic, MediaSlot annotations
app/utils/media_introspection.py Schema auto-discovery for media fields
app/utils/image_utils/ Image processing utilities (persistence, format conversion)
app/websocket/ WebSocket management
app/config.py Application configuration
app/main.py FastAPI application factory