Compute Server Architecture¶
The Compute Server is the AI inference orchestration layer of the Sartiq platform. It manages communication with 11+ AI providers and executes complex multi-step generation workflows.
Overview¶
| Aspect | Details |
|---|---|
| Framework | FastAPI |
| Language | Python |
| Task Queue | Celery with Redis broker |
| Providers | 11+ AI providers (FAL.ai, Vertex AI, custom workers) |
| Port | 9000 |
Architecture Diagram¶
flowchart TB
subgraph API["API Layer"]
Routes[FastAPI Routes]
end
subgraph Orchestration["Orchestration Layer"]
WM[Workflow Manager]
DAG[DAG Executor]
TaskRouter[Task Router]
end
subgraph Queue["Task Queue"]
Celery[Celery Workers]
Redis[(Redis)]
DB[(PostgreSQL)]
end
subgraph Providers["Provider Layer"]
PA[Provider Abstraction]
FAL[FAL.ai Client]
Vertex[Vertex AI Client]
Custom[Custom Workers]
end
subgraph ContentStorage["Content Storage"]
R2[(Cloudflare R2)]
CDN[CDN]
end
Backend[Backend API] -->|POST /tasks| Routes
Routes --> WM
WM --> DAG
DAG --> TaskRouter
TaskRouter --> Celery
Celery --> PA
PA --> FAL
PA --> Vertex
PA --> Custom
FAL --> R2
Vertex --> R2
Custom --> R2
R2 --> CDN
Celery -->|emit events| Redis
Backend -.->|subscribe| Redis
Backend -->|GET /tasks/{id}/result| Routes
Provider Architecture¶
The Compute Server abstracts 11+ AI providers behind a unified interface, allowing the platform to use the best provider for each task type.
Provider Abstraction¶
flowchart LR
subgraph Interface
Base[BaseProvider]
end
subgraph Implementations
FAL[FALProvider]
Vertex[VertexProvider]
Custom[CustomWorkerProvider]
Replicate[ReplicateProvider]
end
Base --> FAL
Base --> Vertex
Base --> Custom
Base --> Replicate
Provider Interface¶
# Simplified provider interface
class BaseProvider(ABC):
@abstractmethod
async def submit_task(self, task: Task) -> str:
"""Submit task and return task ID."""
pass
@abstractmethod
async def get_result(self, task_id: str) -> TaskResult:
"""Get task result (may poll until complete)."""
pass
@abstractmethod
def supports_task_type(self, task_type: TaskType) -> bool:
"""Check if provider supports the task type."""
pass
Provider Capabilities¶
| Provider | Task Types | Notes |
|---|---|---|
| FAL.ai | Generation, Editing, Refine, Video, Upscale | Primary cloud provider, fast inference |
| Vertex AI | Generation, LLM | Google Gemini models for prompts and analysis |
| Custom Workers | Background Removal, Face Enhancement, Garment Enhancement | Local CPU processing |
| Replicate | Specialized models | Backup/alternative models |
| Stability AI | Generation, Upscale | Alternative generation |
Task Types¶
The Compute Server handles various AI task types, each with specific input/output requirements.
Task Type Overview¶
| Type | Description | Typical Provider |
|---|---|---|
| GENERATION | Create new images from prompts | FAL.ai, Vertex AI |
| EDITING | Modify existing images | FAL.ai |
| REFINE | Enhance/improve images | FAL.ai |
| UPSCALE | Increase image resolution | FAL.ai, Custom |
| VIDEO | Generate video from images | FAL.ai |
| LLM | Text generation/analysis | Vertex AI |
| BACKGROUND_REMOVAL | Remove image backgrounds | Custom Workers |
| FACE_ENHANCEMENT | Improve face details | Custom Workers |
| GARMENT_ENHANCEMENT | Improve garment details | Custom Workers |
Task Definition¶
class Task:
id: UUID
type: TaskType
provider: str | None # None = auto-select
inputs: dict # Task-specific inputs
config: dict # Provider-specific config
priority: int
timeout: int
dependencies: list[UUID] # Tasks that must complete first
Workflow Orchestration¶
Complex generations often require multiple steps executed in sequence or parallel. The Compute Server uses DAG (Directed Acyclic Graph) execution for workflow orchestration.
Workflow Structure¶
flowchart TB
subgraph Workflow["Generation Workflow"]
T1[Remove Background]
T2[Generate Base Image]
T3[Apply Product]
T4[Enhance Face]
T5[Enhance Garment]
T6[Upscale]
T7[Final Composite]
end
T1 --> T3
T2 --> T3
T3 --> T4
T3 --> T5
T4 --> T7
T5 --> T7
T7 --> T6
Workflow Definition¶
class Workflow:
id: UUID
generation_id: UUID # Backend generation ID
tasks: list[Task] # All tasks in workflow
status: WorkflowStatus
results: dict[UUID, TaskResult]
def build_generation_workflow(
generation_id: UUID,
product: ProductInput,
subject: SubjectInput,
style: StyleInput,
config: GenerationConfig,
) -> Workflow:
"""Build a complete generation workflow."""
tasks = []
# Background removal (if needed)
if config.remove_background:
bg_task = Task(
type=TaskType.BACKGROUND_REMOVAL,
inputs={"image_url": product.primary_image_url},
)
tasks.append(bg_task)
# Base generation
gen_task = Task(
type=TaskType.GENERATION,
inputs={
"prompt": build_prompt(product, subject, style),
"subject_base_images": subject.base_images,
"style_reference": style.reference_url,
},
dependencies=[bg_task.id] if config.remove_background else [],
)
tasks.append(gen_task)
# Enhancement tasks (parallel)
face_task = Task(
type=TaskType.FACE_ENHANCEMENT,
inputs={"image": gen_task.id}, # Reference to previous task
dependencies=[gen_task.id],
)
garment_task = Task(
type=TaskType.GARMENT_ENHANCEMENT,
inputs={"image": gen_task.id},
dependencies=[gen_task.id],
)
tasks.extend([face_task, garment_task])
# Final upscale
upscale_task = Task(
type=TaskType.UPSCALE,
inputs={"image": gen_task.id},
dependencies=[face_task.id, garment_task.id],
)
tasks.append(upscale_task)
return Workflow(
id=uuid4(),
generation_id=generation_id,
tasks=tasks,
)
DAG Executor¶
The DAG Executor manages the execution order of tasks based on their dependencies.
Execution Flow¶
sequenceDiagram
participant API
participant DAG as DAG Executor
participant Queue as Task Queue
participant Worker as Celery Worker
participant Provider
API->>DAG: Submit workflow
DAG->>DAG: Topological sort
DAG->>Queue: Enqueue ready tasks
loop For each ready task
Worker->>Queue: Dequeue task
Worker->>Provider: Execute task
Provider-->>Worker: Result
Worker->>DAG: Task complete
DAG->>DAG: Check dependents
DAG->>Queue: Enqueue newly ready tasks
end
DAG->>API: Workflow complete
Executor Implementation¶
class DAGExecutor:
def __init__(self, redis: Redis):
self.redis = redis
async def submit_workflow(self, workflow: Workflow):
"""Submit workflow for execution."""
# Store workflow state
await self._store_workflow(workflow)
# Find tasks with no dependencies (entry points)
ready_tasks = [t for t in workflow.tasks if not t.dependencies]
# Enqueue ready tasks
for task in ready_tasks:
await self._enqueue_task(workflow.id, task)
async def handle_task_complete(
self,
workflow_id: UUID,
task_id: UUID,
result: TaskResult,
):
"""Handle task completion and enqueue dependents."""
workflow = await self._get_workflow(workflow_id)
# Store result
workflow.results[task_id] = result
# Find tasks that depend on this one
dependents = [t for t in workflow.tasks if task_id in t.dependencies]
for task in dependents:
# Check if all dependencies are complete
if all(d in workflow.results for d in task.dependencies):
# Resolve input references
resolved_task = self._resolve_inputs(task, workflow.results)
await self._enqueue_task(workflow_id, resolved_task)
# Check if workflow is complete
if len(workflow.results) == len(workflow.tasks):
await self._complete_workflow(workflow)
Processing Flow¶
Task Processing¶
sequenceDiagram
participant Queue as Redis Queue
participant Worker as Celery Worker
participant Router as Task Router
participant Provider as AI Provider
participant R2 as Cloudflare R2
participant Events as Redis Events
Queue->>Worker: Dequeue task
Worker->>Router: Route task
Router->>Router: Select provider
Router->>Provider: Submit task
loop Until complete
Router->>Provider: Poll status
Provider-->>Router: Status
end
Provider-->>Router: Result (image URL)
Router->>Router: Download from provider
Router->>Router: Convert to WebP + embed ICC (Display P3)
Router->>R2: Save to compute/{type}s/{task_id}/
R2-->>Router: Stored path
Router->>Worker: Task complete
Worker->>Events: Emit task.completed
Note over Events: Backend relocates result<br/>from compute/ to images/
Task Router¶
class TaskRouter:
def __init__(self, providers: dict[str, BaseProvider]):
self.providers = providers
def select_provider(self, task: Task) -> BaseProvider:
"""Select the best provider for a task."""
if task.provider:
# Explicit provider requested
return self.providers[task.provider]
# Auto-select based on task type and availability
candidates = [
p for p in self.providers.values()
if p.supports_task_type(task.type)
]
# Prioritize by: cost, speed, quality (configurable)
return self._rank_providers(candidates, task)[0]
async def execute_task(self, task: Task) -> TaskResult:
"""Execute a task using the appropriate provider."""
provider = self.select_provider(task)
# Submit task
task_id = await provider.submit_task(task)
# Wait for result
result = await provider.get_result(task_id)
# Store result in object storage
if result.image_url:
result.image_url = await self._store_result(result.image_url)
return result
Storage & Result Handling¶
All Compute Server storage keys are prefixed with compute/ to isolate compute workspace from permanent content.
| Aspect | Details |
|---|---|
| Key prefix | compute/ — automatically prepended by R2ImageStorage |
| Result path | compute/{task_type}s/{task_id}/result_{index}.webp |
| Image format | WebP at 95% quality with Display P3 ICC profile |
| Storage backend | R2 when configured (R2_ENDPOINT_URL + R2_ACCESS_KEY_ID), falls back to local disk |
| Result relocation | Backend copies accepted results from compute/ to images/ via S3 CopyObject |
Celery Workers¶
Worker Configuration¶
# celery_app.py
from celery import Celery
app = Celery(
"compute",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
app.conf.update(
task_routes={
"compute.tasks.high_priority.*": {"queue": "high-priority"},
"compute.tasks.generation.*": {"queue": "generation"},
"compute.tasks.processing.*": {"queue": "processing"},
},
task_default_queue="default",
worker_prefetch_multiplier=1,
task_acks_late=True,
)
Task Queues¶
| Queue | Purpose | Concurrency |
|---|---|---|
high-priority |
Urgent tasks | High |
generation |
AI generation tasks | Limited by provider |
processing |
CPU-bound tasks | Per-core |
default |
General tasks | Medium |
Task Implementation¶
# tasks/generation.py
from celery import shared_task
from compute.router import TaskRouter
@shared_task(bind=True, max_retries=3)
def execute_task(self, workflow_id: str, task_data: dict):
"""Execute a single task in a workflow."""
try:
task = Task(**task_data)
router = TaskRouter.get_instance()
result = await router.execute_task(task)
# Notify DAG executor
dag_executor.handle_task_complete(
workflow_id=workflow_id,
task_id=task.id,
result=result,
)
except ProviderError as exc:
self.retry(exc=exc, countdown=60 * (self.request.retries + 1))
except Exception as exc:
# Mark task as failed
dag_executor.handle_task_failed(
workflow_id=workflow_id,
task_id=task.id,
error=str(exc),
)
Redis Event Emission¶
When a task completes (or fails), the Compute Server emits events to a Redis event stream that the Backend subscribes to.
Event Types¶
| Event | Description |
|---|---|
task.started |
Task execution began |
task.progress |
Task progress update |
task.completed |
Task finished successfully |
task.failed |
Task failed with error |
Event Payload¶
class TaskEvent:
event: Literal["task.started", "task.progress", "task.completed", "task.failed"]
task_id: UUID
generation_id: UUID
progress: dict | None # For progress events
error: str | None # For failed events
Event Emission Implementation¶
async def emit_task_event(redis: Redis, event: TaskEvent):
"""Emit task lifecycle event to Redis stream."""
await redis.xadd(
"compute:events",
{
"event": event.event,
"task_id": str(event.task_id),
"generation_id": str(event.generation_id),
"payload": json.dumps(event.dict()),
},
)
Result Retrieval¶
The Backend fetches full results via REST API after receiving a completion event:
Response:
{
"task_id": "uuid",
"status": "completed",
"predictions": [
{
"id": "uuid",
"image_url": "https://media.sartiq.com/compute/generations/...",
"metadata": {"seed": 12345, "model": "flux-dev"}
}
],
"timing": {"total_ms": 15000, "queue_ms": 500}
}
Provider Details¶
FAL.ai Integration¶
Primary provider for most generation tasks.
class FALProvider(BaseProvider):
def __init__(self, api_key: str):
self.client = fal_client.SyncClient(api_key)
async def submit_task(self, task: Task) -> str:
if task.type == TaskType.GENERATION:
result = await self.client.submit(
"fal-ai/flux/dev",
arguments={
"prompt": task.inputs["prompt"],
"image_size": task.config.get("size", "landscape_16_9"),
"num_images": task.config.get("num_images", 1),
},
)
return result.request_id
async def get_result(self, task_id: str) -> TaskResult:
result = await self.client.result("fal-ai/flux/dev", task_id)
return TaskResult(
success=True,
image_url=result["images"][0]["url"],
metadata={"seed": result.get("seed")},
)
Vertex AI Integration¶
Google's Gemini models for LLM tasks.
class VertexProvider(BaseProvider):
def __init__(self, project_id: str):
self.client = vertexai.init(project=project_id)
async def submit_task(self, task: Task) -> str:
if task.type == TaskType.LLM:
model = GenerativeModel("gemini-pro")
response = await model.generate_content_async(
task.inputs["prompt"]
)
# Return response directly (no async waiting)
return response.text
Custom Workers¶
Local CPU workers for specialized processing.
class CustomWorkerProvider(BaseProvider):
"""Handles CPU-bound tasks locally."""
async def submit_task(self, task: Task) -> str:
if task.type == TaskType.BACKGROUND_REMOVAL:
# Use rembg or similar library
result = remove_background(task.inputs["image_url"])
return await self._store_result(result)
Key Directories¶
| Directory | Purpose |
|---|---|
app/api/ |
FastAPI routes (tasks, storage, training, admin) |
app/services/task/ |
Task processing, scheduling, and workflow execution |
app/services/storage/ |
Storage implementations (R2 and disk backends) |
app/services/workflow/ |
Workflow management and DAG execution |
app/services/preprocessing/ |
Image preprocessing utilities |
app/tasks/ |
Celery task definitions |
app/utils/images/ |
Image conversion, ICC profiles, saving strategies |
app/core/ |
Configuration and application setup |
Related Documentation¶
- Backend Architecture - How Backend integrates with Compute
- API Reference - Compute API endpoints
- AI Provider Guides - Provider-specific details