Add a New Compute Server Task Type¶
How to add a new task type to the Compute Server. A task type defines a unit of work that can be dispatched, scheduled, processed, and tracked.
Architecture Overview¶
The compute server uses polymorphic task models with a dispatch map that routes each task type to its handler components:
API Request
└─▶ TaskCreateAPI (union schema)
└─▶ dispatch_map[task_type]
├─▶ CRUD Service (persistence)
├─▶ Processor (execution logic)
└─▶ Scheduler (optional, execution timing)
Key files:
| File | Purpose |
|---|---|
app/models/tasks/task_type.py:5-16 |
TaskType enum (11 values) |
app/models/tasks/task.py:16-188 |
Base Task model (polymorphic, task_type discriminator) |
app/models/tasks/generation_task.py:12-61 |
Concrete example (GenerationTask) |
app/services/task/tasks_config.py:155-262 |
dispatch_map (uses TaskHandlerComponents TypedDict) |
app/services/task/processor/task_processor.py:37-219 |
BaseTaskProcessor base class |
Step 1: Add the Enum Value¶
# app/models/tasks/task_type.py
class TaskType(str, Enum):
GENERATION = "generation"
TRAINING = "training"
# ... existing values ...
YOUR_TASK = "your_task" # ← add here
Step 2: Create the Entity Model¶
Create a new model using joined table inheritance from the base Task:
# app/models/tasks/your_task.py
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import Mapped, mapped_column
from app.models.tasks.task import Task
from app.models.tasks.task_type import TaskType
class YourTask(Task):
"""Your task description."""
__tablename__ = "your_tasks"
id: Mapped[UUID] = mapped_column(
ForeignKey("tasks.id"),
primary_key=True,
)
# Add task-specific columns
your_field: Mapped[str] = mapped_column(String(200))
__mapper_args__ = {
"polymorphic_identity": TaskType.YOUR_TASK,
}
The base Task model at app/models/tasks/task.py:16-188 provides common fields: id, task_type, status, provider_type, created_at, updated_at, etc. Your model adds only the columns specific to this task type.
ForeignKey Required
The id column must have ForeignKey("tasks.id") — this is how SQLAlchemy's joined table inheritance works. The polymorphic_identity must match your TaskType enum value.
Step 3: Create Pydantic Schemas¶
Create four schemas following the project's naming convention:
# app/schemas/tasks/your_task.py
from pydantic import BaseModel, Field
from app.models.tasks.task_type import TaskType
class YourTaskCreate(BaseModel):
"""Schema for creating a your_task."""
task_type: TaskType = TaskType.YOUR_TASK
your_field: str = Field(min_length=1, max_length=200)
class YourTaskPublic(BaseModel):
"""Schema for API responses."""
id: UUID
task_type: TaskType
status: str
your_field: str
created_at: datetime
model_config = ConfigDict(from_attributes=True)
class YourTaskPublicUpdate(BaseModel):
"""Schema for status update responses."""
id: UUID
status: str
class YourTaskUpdate(BaseModel):
"""Schema for updating a your_task."""
your_field: str | None = None
Step 4: Implement CRUD Service¶
Extend the base CRUD service for persistence:
# app/services/task/crud/your_task_crud.py
from app.services.task.crud.base_task_crud import BaseTaskCRUD
from app.models.tasks.your_task import YourTask
from app.schemas.tasks.your_task import (
YourTaskCreate,
YourTaskPublic,
YourTaskUpdate,
)
class YourTaskCRUD(BaseTaskCRUD[YourTask, YourTaskCreate, YourTaskUpdate]):
"""CRUD operations for YourTask."""
def __init__(self):
super().__init__(model=YourTask)
Step 5: Implement Processor¶
The processor contains the actual execution logic. Extend BaseTaskProcessor:
# app/services/task/processor/your_task_processor.py
from app.services.task.processor.task_processor import BaseTaskProcessor
from app.models.tasks.your_task import YourTask
class YourTaskProcessor(BaseTaskProcessor):
"""Processes your_task tasks."""
async def process(self, task: YourTask) -> None:
"""Execute the task.
This method is called by the task runner. Use _set_status()
to update task state (handles DB updates, Redis events,
webhooks, and deduplication).
"""
# 1. Extract parameters from the task model
# 2. Call the appropriate provider/service
# 3. Update task status on completion/failure
await self._set_status(task, "processing")
try:
# ... your execution logic ...
await self._set_status(task, "completed")
except Exception as e:
await self._set_status(task, "failed", error=str(e))
raise
See BaseTaskProcessor at app/services/task/processor/task_processor.py:37-219 for the full interface. The _set_status() method handles DB updates, Redis event publishing, webhook notifications, and deduplication.
Step 6: Implement Scheduler (Optional)¶
If your task needs custom scheduling logic (e.g., batching, priority queuing, delayed execution), implement a scheduler:
# app/services/task/scheduler/your_task_scheduler.py
from app.services.task.scheduler.base_task_scheduler import BaseTaskScheduler
from app.models.tasks.your_task import YourTask
class YourTaskScheduler(BaseTaskScheduler):
"""Custom scheduling for your_task tasks."""
async def schedule(self, task: YourTask) -> None:
"""Determine when and how to execute this task."""
# Default behavior: immediate execution
# Override for batching, delays, priority, etc.
...
Skip this step if the default scheduling behavior is sufficient.
Step 7: Create Database Migration¶
Generate an Alembic migration for the new table:
Review the generated migration to ensure it creates the joined table with the correct foreign key to tasks.id.
Step 8: Register in the Dispatch Map¶
Add your task type to dispatch_map so the system knows how to handle it:
# app/services/task/tasks_config.py
from app.services.task.crud.your_task_crud import YourTaskCRUD
from app.services.task.processor.your_task_processor import YourTaskProcessor
dispatch_map: dict[TaskType, TaskHandlerComponents] = {
# ... existing entries ...
TaskType.YOUR_TASK: {
"crud": YourTaskCRUD(),
"processor": YourTaskProcessor(),
# "scheduler": YourTaskScheduler(), # optional
},
}
The TaskHandlerComponents TypedDict defines the shape. The dispatch map at tasks_config.py:155-262 routes incoming tasks to the correct handlers.
Step 9: Update Schema Unions¶
Add your schemas to the discriminated unions so the API accepts/returns your task type:
# In the schema union files:
TaskCreateAPI = YourTaskCreate | GenerationTaskCreate | ...
TaskPublicAPI = YourTaskPublic | GenerationTaskPublic | ...
TaskUpdateAPI = YourTaskUpdate | GenerationTaskUpdate | ...
These unions use the task_type field as the discriminator.
Step 10: Add Tests¶
Write tests covering:
- Model tests: Polymorphic identity resolves correctly, fields validate
- CRUD tests: Create, read, update operations
- Processor tests:
process()handles success and failure paths - API tests: POST creates task, GET returns correct schema, status updates propagate
Checklist¶
- Enum value added to
TaskType - Entity model with joined table inheritance and
polymorphic_identity - Pydantic schemas (Create, Public, PublicUpdate, Update)
- CRUD service extending
BaseTaskCRUD - Processor extending
BaseTaskProcessorwithprocess()implemented - Scheduler (if needed) extending
BaseTaskScheduler - Alembic migration created and tested
- Registered in
dispatch_mapattasks_config.py - Schema unions updated (
TaskCreateAPI,TaskPublicAPI,TaskUpdateAPI) - Tests for model, CRUD, processor, and API
See Also¶
- Database Patterns (SQLAlchemy) -- Model definitions, mixins, and session lifecycle for task models
- Add a New AI Provider -- Integrating a new AI provider that your task processor can call