Skip to content

Add a New Compute Server Task Type

How to add a new task type to the Compute Server. A task type defines a unit of work that can be dispatched, scheduled, processed, and tracked.


Architecture Overview

The compute server uses polymorphic task models with a dispatch map that routes each task type to its handler components:

API Request
  └─▶ TaskCreateAPI (union schema)
        └─▶ dispatch_map[task_type]
              ├─▶ CRUD Service (persistence)
              ├─▶ Processor (execution logic)
              └─▶ Scheduler (optional, execution timing)

Key files:

File Purpose
app/models/tasks/task_type.py:5-16 TaskType enum (11 values)
app/models/tasks/task.py:16-188 Base Task model (polymorphic, task_type discriminator)
app/models/tasks/generation_task.py:12-61 Concrete example (GenerationTask)
app/services/task/tasks_config.py:155-262 dispatch_map (uses TaskHandlerComponents TypedDict)
app/services/task/processor/task_processor.py:37-219 BaseTaskProcessor base class

Step 1: Add the Enum Value

# app/models/tasks/task_type.py

class TaskType(str, Enum):
    GENERATION = "generation"
    TRAINING = "training"
    # ... existing values ...
    YOUR_TASK = "your_task"  # ← add here

Step 2: Create the Entity Model

Create a new model using joined table inheritance from the base Task:

# app/models/tasks/your_task.py

from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column

from app.models.tasks.task import Task
from app.models.tasks.task_type import TaskType


class YourTask(Task):
    """Your task description."""

    __tablename__ = "your_tasks"

    id: Mapped[UUID] = mapped_column(
        ForeignKey("tasks.id"),
        primary_key=True,
    )

    # Add task-specific columns
    your_field: Mapped[str] = mapped_column(String(200))

    __mapper_args__ = {
        "polymorphic_identity": TaskType.YOUR_TASK,
    }

The base Task model at app/models/tasks/task.py:16-188 provides common fields: id, task_type, status, provider_type, created_at, updated_at, etc. Your model adds only the columns specific to this task type.

ForeignKey Required

The id column must have ForeignKey("tasks.id") — this is how SQLAlchemy's joined table inheritance works. The polymorphic_identity must match your TaskType enum value.


Step 3: Create Pydantic Schemas

Create four schemas following the project's naming convention:

# app/schemas/tasks/your_task.py

from pydantic import BaseModel, Field
from app.models.tasks.task_type import TaskType


class YourTaskCreate(BaseModel):
    """Schema for creating a your_task."""
    task_type: TaskType = TaskType.YOUR_TASK
    your_field: str = Field(min_length=1, max_length=200)


class YourTaskPublic(BaseModel):
    """Schema for API responses."""
    id: UUID
    task_type: TaskType
    status: str
    your_field: str
    created_at: datetime
    model_config = ConfigDict(from_attributes=True)


class YourTaskPublicUpdate(BaseModel):
    """Schema for status update responses."""
    id: UUID
    status: str


class YourTaskUpdate(BaseModel):
    """Schema for updating a your_task."""
    your_field: str | None = None

Step 4: Implement CRUD Service

Extend the base CRUD service for persistence:

# app/services/task/crud/your_task_crud.py

from app.services.task.crud.base_task_crud import BaseTaskCRUD
from app.models.tasks.your_task import YourTask
from app.schemas.tasks.your_task import (
    YourTaskCreate,
    YourTaskPublic,
    YourTaskUpdate,
)


class YourTaskCRUD(BaseTaskCRUD[YourTask, YourTaskCreate, YourTaskUpdate]):
    """CRUD operations for YourTask."""

    def __init__(self):
        super().__init__(model=YourTask)

Step 5: Implement Processor

The processor contains the actual execution logic. Extend BaseTaskProcessor:

# app/services/task/processor/your_task_processor.py

from app.services.task.processor.task_processor import BaseTaskProcessor
from app.models.tasks.your_task import YourTask


class YourTaskProcessor(BaseTaskProcessor):
    """Processes your_task tasks."""

    async def process(self, task: YourTask) -> None:
        """Execute the task.

        This method is called by the task runner. Use _set_status()
        to update task state (handles DB updates, Redis events,
        webhooks, and deduplication).
        """
        # 1. Extract parameters from the task model
        # 2. Call the appropriate provider/service
        # 3. Update task status on completion/failure
        await self._set_status(task, "processing")
        try:
            # ... your execution logic ...
            await self._set_status(task, "completed")
        except Exception as e:
            await self._set_status(task, "failed", error=str(e))
            raise

See BaseTaskProcessor at app/services/task/processor/task_processor.py:37-219 for the full interface. The _set_status() method handles DB updates, Redis event publishing, webhook notifications, and deduplication.


Step 6: Implement Scheduler (Optional)

If your task needs custom scheduling logic (e.g., batching, priority queuing, delayed execution), implement a scheduler:

# app/services/task/scheduler/your_task_scheduler.py

from app.services.task.scheduler.base_task_scheduler import BaseTaskScheduler
from app.models.tasks.your_task import YourTask


class YourTaskScheduler(BaseTaskScheduler):
    """Custom scheduling for your_task tasks."""

    async def schedule(self, task: YourTask) -> None:
        """Determine when and how to execute this task."""
        # Default behavior: immediate execution
        # Override for batching, delays, priority, etc.
        ...

Skip this step if the default scheduling behavior is sufficient.


Step 7: Create Database Migration

Generate an Alembic migration for the new table:

alembic revision --autogenerate -m "add your_tasks table"
alembic upgrade head

Review the generated migration to ensure it creates the joined table with the correct foreign key to tasks.id.


Step 8: Register in the Dispatch Map

Add your task type to dispatch_map so the system knows how to handle it:

# app/services/task/tasks_config.py

from app.services.task.crud.your_task_crud import YourTaskCRUD
from app.services.task.processor.your_task_processor import YourTaskProcessor

dispatch_map: dict[TaskType, TaskHandlerComponents] = {
    # ... existing entries ...
    TaskType.YOUR_TASK: {
        "crud": YourTaskCRUD(),
        "processor": YourTaskProcessor(),
        # "scheduler": YourTaskScheduler(),  # optional
    },
}

The TaskHandlerComponents TypedDict defines the shape. The dispatch map at tasks_config.py:155-262 routes incoming tasks to the correct handlers.


Step 9: Update Schema Unions

Add your schemas to the discriminated unions so the API accepts/returns your task type:

# In the schema union files:
TaskCreateAPI = YourTaskCreate | GenerationTaskCreate | ...
TaskPublicAPI = YourTaskPublic | GenerationTaskPublic | ...
TaskUpdateAPI = YourTaskUpdate | GenerationTaskUpdate | ...

These unions use the task_type field as the discriminator.


Step 10: Add Tests

Write tests covering:

  • Model tests: Polymorphic identity resolves correctly, fields validate
  • CRUD tests: Create, read, update operations
  • Processor tests: process() handles success and failure paths
  • API tests: POST creates task, GET returns correct schema, status updates propagate

Checklist

  • Enum value added to TaskType
  • Entity model with joined table inheritance and polymorphic_identity
  • Pydantic schemas (Create, Public, PublicUpdate, Update)
  • CRUD service extending BaseTaskCRUD
  • Processor extending BaseTaskProcessor with process() implemented
  • Scheduler (if needed) extending BaseTaskScheduler
  • Alembic migration created and tested
  • Registered in dispatch_map at tasks_config.py
  • Schema unions updated (TaskCreateAPI, TaskPublicAPI, TaskUpdateAPI)
  • Tests for model, CRUD, processor, and API

See Also