Skip to content

Compute Server Architecture

The Compute Server is the AI inference orchestration layer of the Sartiq platform. It manages communication with 11+ AI providers and executes complex multi-step generation workflows.


Overview

Aspect Details
Framework FastAPI
Language Python
Task Queue Celery with Redis broker
Providers 11+ AI providers (FAL.ai, Vertex AI, custom workers)
Port 9000

Architecture Diagram

flowchart TB
    subgraph API["API Layer"]
        Routes[FastAPI Routes]
    end

    subgraph Orchestration["Orchestration Layer"]
        WM[Workflow Manager]
        DAG[DAG Executor]
        TaskRouter[Task Router]
    end

    subgraph Queue["Task Queue"]
        Celery[Celery Workers]
        Redis[(Redis)]
        DB[(PostgreSQL)]
    end

    subgraph Providers["Provider Layer"]
        PA[Provider Abstraction]
        FAL[FAL.ai Client]
        Vertex[Vertex AI Client]
        Custom[Custom Workers]
    end

    subgraph ContentStorage["Content Storage"]
        R2[(Cloudflare R2)]
        CDN[CDN]
    end

    Backend[Backend API] -->|POST /tasks| Routes
    Routes --> WM
    WM --> DAG
    DAG --> TaskRouter
    TaskRouter --> Celery
    Celery --> PA
    PA --> FAL
    PA --> Vertex
    PA --> Custom
    FAL --> R2
    Vertex --> R2
    Custom --> R2
    R2 --> CDN
    Celery -->|emit events| Redis
    Backend -.->|subscribe| Redis
    Backend -->|GET /tasks/{id}/result| Routes

Provider Architecture

The Compute Server abstracts 11+ AI providers behind a unified interface, allowing the platform to use the best provider for each task type.

Provider Abstraction

flowchart LR
    subgraph Interface
        Base[BaseProvider]
    end

    subgraph Implementations
        FAL[FALProvider]
        Vertex[VertexProvider]
        Custom[CustomWorkerProvider]
        Replicate[ReplicateProvider]
    end

    Base --> FAL
    Base --> Vertex
    Base --> Custom
    Base --> Replicate

Provider Interface

# Simplified provider interface
class BaseProvider(ABC):
    @abstractmethod
    async def submit_task(self, task: Task) -> str:
        """Submit task and return task ID."""
        pass

    @abstractmethod
    async def get_result(self, task_id: str) -> TaskResult:
        """Get task result (may poll until complete)."""
        pass

    @abstractmethod
    def supports_task_type(self, task_type: TaskType) -> bool:
        """Check if provider supports the task type."""
        pass

Provider Capabilities

Provider Task Types Notes
FAL.ai Generation, Editing, Refine, Video, Upscale Primary cloud provider, fast inference
Vertex AI Generation, LLM Google Gemini models for prompts and analysis
Custom Workers Background Removal, Face Enhancement, Garment Enhancement Local CPU processing
Replicate Specialized models Backup/alternative models
Stability AI Generation, Upscale Alternative generation

Task Types

The Compute Server handles various AI task types, each with specific input/output requirements.

Task Type Overview

Type Description Typical Provider
GENERATION Create new images from prompts FAL.ai, Vertex AI
EDITING Modify existing images FAL.ai
REFINE Enhance/improve images FAL.ai
UPSCALE Increase image resolution FAL.ai, Custom
VIDEO Generate video from images FAL.ai
LLM Text generation/analysis Vertex AI
BACKGROUND_REMOVAL Remove image backgrounds Custom Workers
FACE_ENHANCEMENT Improve face details Custom Workers
GARMENT_ENHANCEMENT Improve garment details Custom Workers

Task Definition

class Task:
    id: UUID
    type: TaskType
    provider: str | None  # None = auto-select
    inputs: dict          # Task-specific inputs
    config: dict          # Provider-specific config
    priority: int
    timeout: int
    dependencies: list[UUID]  # Tasks that must complete first

Workflow Orchestration

Complex generations often require multiple steps executed in sequence or parallel. The Compute Server uses DAG (Directed Acyclic Graph) execution for workflow orchestration.

Workflow Structure

flowchart TB
    subgraph Workflow["Generation Workflow"]
        T1[Remove Background]
        T2[Generate Base Image]
        T3[Apply Product]
        T4[Enhance Face]
        T5[Enhance Garment]
        T6[Upscale]
        T7[Final Composite]
    end

    T1 --> T3
    T2 --> T3
    T3 --> T4
    T3 --> T5
    T4 --> T7
    T5 --> T7
    T7 --> T6

Workflow Definition

class Workflow:
    id: UUID
    generation_id: UUID    # Backend generation ID
    tasks: list[Task]      # All tasks in workflow
    status: WorkflowStatus
    results: dict[UUID, TaskResult]

def build_generation_workflow(
    generation_id: UUID,
    product: ProductInput,
    subject: SubjectInput,
    style: StyleInput,
    config: GenerationConfig,
) -> Workflow:
    """Build a complete generation workflow."""
    tasks = []

    # Background removal (if needed)
    if config.remove_background:
        bg_task = Task(
            type=TaskType.BACKGROUND_REMOVAL,
            inputs={"image_url": product.primary_image_url},
        )
        tasks.append(bg_task)

    # Base generation
    gen_task = Task(
        type=TaskType.GENERATION,
        inputs={
            "prompt": build_prompt(product, subject, style),
            "subject_base_images": subject.base_images,
            "style_reference": style.reference_url,
        },
        dependencies=[bg_task.id] if config.remove_background else [],
    )
    tasks.append(gen_task)

    # Enhancement tasks (parallel)
    face_task = Task(
        type=TaskType.FACE_ENHANCEMENT,
        inputs={"image": gen_task.id},  # Reference to previous task
        dependencies=[gen_task.id],
    )
    garment_task = Task(
        type=TaskType.GARMENT_ENHANCEMENT,
        inputs={"image": gen_task.id},
        dependencies=[gen_task.id],
    )
    tasks.extend([face_task, garment_task])

    # Final upscale
    upscale_task = Task(
        type=TaskType.UPSCALE,
        inputs={"image": gen_task.id},
        dependencies=[face_task.id, garment_task.id],
    )
    tasks.append(upscale_task)

    return Workflow(
        id=uuid4(),
        generation_id=generation_id,
        tasks=tasks,
    )

DAG Executor

The DAG Executor manages the execution order of tasks based on their dependencies.

Execution Flow

sequenceDiagram
    participant API
    participant DAG as DAG Executor
    participant Queue as Task Queue
    participant Worker as Celery Worker
    participant Provider

    API->>DAG: Submit workflow
    DAG->>DAG: Topological sort
    DAG->>Queue: Enqueue ready tasks

    loop For each ready task
        Worker->>Queue: Dequeue task
        Worker->>Provider: Execute task
        Provider-->>Worker: Result
        Worker->>DAG: Task complete
        DAG->>DAG: Check dependents
        DAG->>Queue: Enqueue newly ready tasks
    end

    DAG->>API: Workflow complete

Executor Implementation

class DAGExecutor:
    def __init__(self, redis: Redis):
        self.redis = redis

    async def submit_workflow(self, workflow: Workflow):
        """Submit workflow for execution."""
        # Store workflow state
        await self._store_workflow(workflow)

        # Find tasks with no dependencies (entry points)
        ready_tasks = [t for t in workflow.tasks if not t.dependencies]

        # Enqueue ready tasks
        for task in ready_tasks:
            await self._enqueue_task(workflow.id, task)

    async def handle_task_complete(
        self,
        workflow_id: UUID,
        task_id: UUID,
        result: TaskResult,
    ):
        """Handle task completion and enqueue dependents."""
        workflow = await self._get_workflow(workflow_id)

        # Store result
        workflow.results[task_id] = result

        # Find tasks that depend on this one
        dependents = [t for t in workflow.tasks if task_id in t.dependencies]

        for task in dependents:
            # Check if all dependencies are complete
            if all(d in workflow.results for d in task.dependencies):
                # Resolve input references
                resolved_task = self._resolve_inputs(task, workflow.results)
                await self._enqueue_task(workflow_id, resolved_task)

        # Check if workflow is complete
        if len(workflow.results) == len(workflow.tasks):
            await self._complete_workflow(workflow)

Processing Flow

Task Processing

sequenceDiagram
    participant Queue as Redis Queue
    participant Worker as Celery Worker
    participant Router as Task Router
    participant Provider as AI Provider
    participant R2 as Cloudflare R2
    participant Events as Redis Events

    Queue->>Worker: Dequeue task
    Worker->>Router: Route task
    Router->>Router: Select provider
    Router->>Provider: Submit task

    loop Until complete
        Router->>Provider: Poll status
        Provider-->>Router: Status
    end

    Provider-->>Router: Result (image URL)
    Router->>Router: Download from provider
    Router->>Router: Convert to WebP + embed ICC (Display P3)
    Router->>R2: Save to compute/{type}s/{task_id}/
    R2-->>Router: Stored path
    Router->>Worker: Task complete
    Worker->>Events: Emit task.completed
    Note over Events: Backend relocates result<br/>from compute/ to images/

Task Router

class TaskRouter:
    def __init__(self, providers: dict[str, BaseProvider]):
        self.providers = providers

    def select_provider(self, task: Task) -> BaseProvider:
        """Select the best provider for a task."""
        if task.provider:
            # Explicit provider requested
            return self.providers[task.provider]

        # Auto-select based on task type and availability
        candidates = [
            p for p in self.providers.values()
            if p.supports_task_type(task.type)
        ]

        # Prioritize by: cost, speed, quality (configurable)
        return self._rank_providers(candidates, task)[0]

    async def execute_task(self, task: Task) -> TaskResult:
        """Execute a task using the appropriate provider."""
        provider = self.select_provider(task)

        # Submit task
        task_id = await provider.submit_task(task)

        # Wait for result
        result = await provider.get_result(task_id)

        # Store result in object storage
        if result.image_url:
            result.image_url = await self._store_result(result.image_url)

        return result

Storage & Result Handling

All Compute Server storage keys are prefixed with compute/ to isolate compute workspace from permanent content.

Aspect Details
Key prefix compute/ — automatically prepended by R2ImageStorage
Result path compute/{task_type}s/{task_id}/result_{index}.webp
Image format WebP at 95% quality with Display P3 ICC profile
Storage backend R2 when configured (R2_ENDPOINT_URL + R2_ACCESS_KEY_ID), falls back to local disk
Result relocation Backend copies accepted results from compute/ to images/ via S3 CopyObject

Celery Workers

Worker Configuration

# celery_app.py
from celery import Celery

app = Celery(
    "compute",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)

app.conf.update(
    task_routes={
        "compute.tasks.high_priority.*": {"queue": "high-priority"},
        "compute.tasks.generation.*": {"queue": "generation"},
        "compute.tasks.processing.*": {"queue": "processing"},
    },
    task_default_queue="default",
    worker_prefetch_multiplier=1,
    task_acks_late=True,
)

Task Queues

Queue Purpose Concurrency
high-priority Urgent tasks High
generation AI generation tasks Limited by provider
processing CPU-bound tasks Per-core
default General tasks Medium

Task Implementation

# tasks/generation.py
from celery import shared_task
from compute.router import TaskRouter

@shared_task(bind=True, max_retries=3)
def execute_task(self, workflow_id: str, task_data: dict):
    """Execute a single task in a workflow."""
    try:
        task = Task(**task_data)
        router = TaskRouter.get_instance()

        result = await router.execute_task(task)

        # Notify DAG executor
        dag_executor.handle_task_complete(
            workflow_id=workflow_id,
            task_id=task.id,
            result=result,
        )

    except ProviderError as exc:
        self.retry(exc=exc, countdown=60 * (self.request.retries + 1))
    except Exception as exc:
        # Mark task as failed
        dag_executor.handle_task_failed(
            workflow_id=workflow_id,
            task_id=task.id,
            error=str(exc),
        )

Redis Event Emission

When a task completes (or fails), the Compute Server emits events to a Redis event stream that the Backend subscribes to.

Event Types

Event Description
task.started Task execution began
task.progress Task progress update
task.completed Task finished successfully
task.failed Task failed with error

Event Payload

class TaskEvent:
    event: Literal["task.started", "task.progress", "task.completed", "task.failed"]
    task_id: UUID
    generation_id: UUID
    progress: dict | None     # For progress events
    error: str | None         # For failed events

Event Emission Implementation

async def emit_task_event(redis: Redis, event: TaskEvent):
    """Emit task lifecycle event to Redis stream."""
    await redis.xadd(
        "compute:events",
        {
            "event": event.event,
            "task_id": str(event.task_id),
            "generation_id": str(event.generation_id),
            "payload": json.dumps(event.dict()),
        },
    )

Result Retrieval

The Backend fetches full results via REST API after receiving a completion event:

GET /api/v1/tasks/{task_id}/result
Authorization: Bearer {service_token}

Response:

{
  "task_id": "uuid",
  "status": "completed",
  "predictions": [
    {
      "id": "uuid",
      "image_url": "https://media.sartiq.com/compute/generations/...",
      "metadata": {"seed": 12345, "model": "flux-dev"}
    }
  ],
  "timing": {"total_ms": 15000, "queue_ms": 500}
}

Provider Details

FAL.ai Integration

Primary provider for most generation tasks.

class FALProvider(BaseProvider):
    def __init__(self, api_key: str):
        self.client = fal_client.SyncClient(api_key)

    async def submit_task(self, task: Task) -> str:
        if task.type == TaskType.GENERATION:
            result = await self.client.submit(
                "fal-ai/flux/dev",
                arguments={
                    "prompt": task.inputs["prompt"],
                    "image_size": task.config.get("size", "landscape_16_9"),
                    "num_images": task.config.get("num_images", 1),
                },
            )
            return result.request_id

    async def get_result(self, task_id: str) -> TaskResult:
        result = await self.client.result("fal-ai/flux/dev", task_id)
        return TaskResult(
            success=True,
            image_url=result["images"][0]["url"],
            metadata={"seed": result.get("seed")},
        )

Vertex AI Integration

Google's Gemini models for LLM tasks.

class VertexProvider(BaseProvider):
    def __init__(self, project_id: str):
        self.client = vertexai.init(project=project_id)

    async def submit_task(self, task: Task) -> str:
        if task.type == TaskType.LLM:
            model = GenerativeModel("gemini-pro")
            response = await model.generate_content_async(
                task.inputs["prompt"]
            )
            # Return response directly (no async waiting)
            return response.text

Custom Workers

Local CPU workers for specialized processing.

class CustomWorkerProvider(BaseProvider):
    """Handles CPU-bound tasks locally."""

    async def submit_task(self, task: Task) -> str:
        if task.type == TaskType.BACKGROUND_REMOVAL:
            # Use rembg or similar library
            result = remove_background(task.inputs["image_url"])
            return await self._store_result(result)

Key Directories

Directory Purpose
app/api/ FastAPI routes (tasks, storage, training, admin)
app/services/task/ Task processing, scheduling, and workflow execution
app/services/storage/ Storage implementations (R2 and disk backends)
app/services/workflow/ Workflow management and DAG execution
app/services/preprocessing/ Image preprocessing utilities
app/tasks/ Celery task definitions
app/utils/images/ Image conversion, ICC profiles, saving strategies
app/core/ Configuration and application setup