Task Management System Class Diagram¶
Complete System Architecture¶
The following class diagram illustrates the relationships between all components in the Task Management System:
classDiagram
%% Core Model Classes
class Base {
<<abstract>>
+UUID id
+datetime created_at
+datetime updated_at
}
class Task {
<<entity>>
+TaskType task_type
+string queue_name
+int priority
+string origin_instance
+string origin_zone
+string origin_tenant_id
+TaskStatus status
+datetime started_at
+datetime completed_at
+string executor_type
+string executor_instance
+string celery_task_id
+string service_task_id
+List~string~ errors
+int retry_count
+float progress
+bool is_resumed
+int resume_attempt_count
+datetime last_resume_attempt
+mark_as_resumed()
+add_error(error_message)
+get_last_error()
}
class GenerationTask {
<<entity>>
+UUID backend_generation_id
+UUID backend_prediction_id
+string subject_model_path
+string product_model_path
+string style_model_path
+string product2_model_path
+float subject_lora_weight
+float product_lora_weight
+float style_lora_weight
+float product2_lora_weight
+string comfy_prompt_id
+string workflow_id
+string prompt
+int width
+int height
+int seed
+string result_image_path
+string comfy_server_url
+string comfy_workflow_data
+List~string~ comfyui_retry_server_urls
+can_resume_comfyui() bool
+add_generation_error(error_message, server_url)
+get_generation_error_history() List~string~
+get_comfyui_retry_server_history() List~string~
}
%% Enums
class TaskType {
<<enumeration>>
GENERATION
}
class TaskStatus {
<<enumeration>>
PENDING
EXECUTING
COMPLETED
FAILED
CANCELLED
}
%% Schema Classes
class TaskBase {
<<schema>>
+TaskType task_type
+int priority
+string origin_instance
+string origin_zone
+string origin_tenant_id
}
class CreateTask {
<<schema>>
}
class TaskPublic {
<<schema>>
+UUID id
+datetime created_at
+datetime updated_at
+TaskStatus status
+datetime started_at
+datetime completed_at
+string executor_type
+string executor_instance
+string celery_task_id
+string service_task_id
+List~string~ errors
+int retry_count
+float progress
}
class TaskPublicUpdate {
<<schema>>
+TaskStatus status
+int priority
+string executor_type
+string executor_instance
+List~string~ errors
}
class TaskUpdate {
<<schema>>
+datetime created_at
+datetime updated_at
+TaskType task_type
+string origin_instance
+string origin_zone
+string origin_tenant_id
+datetime started_at
+datetime completed_at
+string celery_task_id
+string service_task_id
+int retry_count
+float progress
+bool is_resumed
+int resume_attempt_count
+datetime last_resume_attempt
}
class CreateGenerationTask {
<<schema>>
+string subject_model_path
+string product_model_path
+string style_model_path
+string product2_model_path
+float subject_lora_weight
+float product_lora_weight
+float style_lora_weight
+float product2_lora_weight
+string prompt
+int width
+int height
+int seed
}
class GenerationTaskPublic {
<<schema>>
+UUID backend_generation_id
+UUID backend_prediction_id
+string subject_model_path
+string product_model_path
+string style_model_path
+string product2_model_path
+float subject_lora_weight
+float product_lora_weight
+float style_lora_weight
+float product2_lora_weight
+string workflow_id
+string prompt
+int width
+int height
+int seed
+string result_image_path
+List~string~ comfyui_retry_server_urls
}
class GenerationTaskPublicUpdate {
<<schema>>
}
class GenerationTaskUpdate {
<<schema>>
+UUID backend_generation_id
+UUID backend_prediction_id
+string subject_model_path
+string product_model_path
+string style_model_path
+string product2_model_path
+float subject_lora_weight
+float product_lora_weight
+float style_lora_weight
+float product2_lora_weight
+string workflow_id
+string comfy_prompt_id
+string prompt
+int width
+int height
+int seed
+string result_image_path
+string comfy_server_url
+string comfy_workflow_data
+List~string~ comfyui_retry_server_urls
}
%% Service Classes
class BaseTaskCRUD~T, CreateSchemaType, UpdateSchemaType, PublicSchemaType~ {
<<abstract>>
#Task model
+create(db, create_schema) T
+get(db, task_id) T
+get_multi(db, skip, limit, filters) List~T~
+update(db, task_id, update_data) T
+delete(db, task_id) T
+update_status(db, task_id, status) T
#_create_entity_from_schema(create_schema) T
#_update_entity_from_schema(entity, update_schema) T
}
class GenerationTaskCRUD {
<<service>>
+get_by_workflow(db, workflow_id, limit) List~GenerationTask~
+get_by_status_and_workflow(db, status, workflow_id, limit) List~GenerationTask~
+get_by_backend_prediction_id(db, backend_prediction_id) GenerationTask
+get_pending_tasks(db, queue_name, limit) List~GenerationTask~
+update_result(db, task_id, result_image_path, status) GenerationTask
}
class BaseTaskProcessor~T, C~ {
<<abstract>>
+process(task, crud)* void
#_set_status(task, crud, status, progress, error_message) void
}
class GenerationTaskProcessor {
<<service>>
-image_storage
-settings
+process(task, crud) void
-_resume_comfyui_task(task, crud) void
-_start_fresh_execution(task, crud) void
-_check_comfyui_status(task) dict
-_execute_generation_task(task, crud) void
-_handle_comfyui_completion(task, crud, comfy_status) void
-_handle_comfyui_failure(task, crud, comfy_status) void
-_handle_resume_failure(task, crud, error_msg) void
-_save_result_image(task, result, comfy_service) string
}
class BaseTaskScheduler {
<<service>>
+schedule_task(task, priority) string
+reschedule_task(task) string
+cancel_task(task_id, publish_to_redis) bool
}
class GenerationTaskScheduler {
<<service>>
+cancel_task(task_id) bool
}
%% Configuration Classes
class TaskHandlerComponents {
<<interface>>
+crud() BaseTaskCRUD
+processor() BaseTaskProcessor
+scheduler() BaseTaskScheduler
+Type~BaseModel~ create_schema
+Type~BaseModel~ public_schema
+Type~BaseModel~ public_update_schema
+Type~BaseModel~ update_schema
}
class DispatchMap {
<<configuration>>
+Dict~TaskType, TaskHandlerComponents~ map
+get_handlers(task_type) TaskHandlerComponents
}
%% API Layer
class TasksAPI {
<<controller>>
+create_task(task_data) TaskPublicAPI
+list_tasks(filters) List~TaskPublicAPI~
+get_task(task_id) TaskPublicAPI
+update_task(task_id, update_data) TaskPublicAPI
+delete_task(task_id) TaskPublicAPI
+cancel_task(task_id) TaskPublicAPI
}
%% Queue Layer
class CeleryTask {
<<queue>>
+process_task(task_id) void
}
class TaskProcessor {
<<service>>
+process_task_business_logic(task_id) void
}
%% Relationships - Inheritance
Base <|-- Task : inherits
Task <|-- GenerationTask : polymorphic inheritance
TaskBase <|-- CreateTask : extends
TaskBase <|-- TaskPublic : extends
CreateTask <|-- CreateGenerationTask : extends
TaskPublic <|-- GenerationTaskPublic : extends
TaskPublicUpdate <|-- GenerationTaskPublicUpdate : extends
TaskUpdate <|-- GenerationTaskUpdate : extends
TaskPublicUpdate <|-- TaskUpdate : extends
BaseTaskCRUD <|-- GenerationTaskCRUD : implements
BaseTaskProcessor <|-- GenerationTaskProcessor : implements
BaseTaskScheduler <|-- GenerationTaskScheduler : extends
%% Relationships - Composition
Task "1" *-- "1" TaskType : has
Task "1" *-- "1" TaskStatus : has
GenerationTaskCRUD ..> GenerationTask : manages
GenerationTaskProcessor ..> GenerationTask : processes
GenerationTaskScheduler ..> GenerationTask : schedules
%% Relationships - Usage
TaskHandlerComponents ..> BaseTaskCRUD : creates
TaskHandlerComponents ..> BaseTaskProcessor : creates
TaskHandlerComponents ..> BaseTaskScheduler : creates
DispatchMap "1" *-- "*" TaskHandlerComponents : contains
TasksAPI ..> DispatchMap : uses
TasksAPI ..> CreateTask : validates
TasksAPI ..> TaskPublic : returns
TasksAPI ..> TaskPublicUpdate : accepts
CeleryTask ..> TaskProcessor : delegates to
TaskProcessor ..> DispatchMap : resolves services
TaskProcessor ..> BaseTaskProcessor : executes
%% Schema relationships
GenerationTaskCRUD ..> CreateGenerationTask : accepts
GenerationTaskCRUD ..> GenerationTaskUpdate : accepts
GenerationTaskCRUD ..> GenerationTaskPublic : returns
Component Explanations¶
1. Model Layer (Entities)¶
Task (Abstract Base Entity)¶
- Purpose: Provides common fields and polymorphic inheritance base
- Key Fields:
id,task_type,status,priority,origin_instance, timestamps - Polymorphic Configuration: Uses
task_typeas discriminator column - Database Strategy: Single table inheritance with joined tables for specific task data
GenerationTask (Concrete Entity)¶
- Purpose: Extends Task with generation-specific fields
- Key Fields: Model paths, LoRA weights, prompt, dimensions, result path
- Inheritance: Inherits all base task functionality from Task
- Storage: Additional fields stored in separate
generation_taskstable
Enums (TaskStatus, TaskType)¶
- TaskStatus: Defines task lifecycle states (PENDING → EXECUTING → COMPLETED/FAILED)
- TaskType: Registry of available task types (currently GENERATION)
- Usage: Referenced by Task entity and used for service dispatch
2. Service Layer¶
CRUD Services (BaseTaskCRUD → GenerationTaskCRUD)¶
BaseTaskCRUD (Generic Abstract Base)
- Responsibility: Common database operations for all task types
- Generic Type: BaseTaskCRUD[T, CreateSchemaType, UpdateSchemaType, PublicSchemaType]
- Key Methods:
- create(): Convert schema to entity and persist
- get()/get_multi(): Retrieve tasks with filtering
- update(): Apply updates with validation
- delete(): Remove tasks
- update_status(): Status updates with timestamp management
- Extension Points:
- _create_entity_from_schema(): Custom schema-to-entity conversion
- _update_entity_from_schema(): Custom update logic
GenerationTaskCRUD (Concrete Implementation)
- Inheritance: Extends BaseTaskCRUD with GenerationTask-specific operations
- Additional Methods:
- get_by_workflow(): Filter by workflow type
- get_by_backend_prediction_id(): Find tasks by external IDs
- update_result(): Update with generated image path
- Customization: Overrides _create_entity_from_schema() to set default workflow
Processor Services (BaseTaskProcessor → GenerationTaskProcessor)¶
BaseTaskProcessor (Abstract Base)
- Responsibility: Task execution with progress tracking and event publishing
- Generic Type: BaseTaskProcessor[T, C] where T=Task, C=CRUD
- Key Methods:
- process(): Abstract method for business logic (must implement)
- _set_status(): Unified status updates with Redis events
- Features:
- Automatic timestamp management (started_at, completed_at)
- Redis stream publishing for real-time updates
- Error handling and progress tracking
GenerationTaskProcessor (Concrete Implementation)
- Inheritance: Extends BaseTaskProcessor for generation-specific logic
- Dependencies: ComfyUI service, image storage, workflow definitions
- Process Flow:
1. Adapt task data to workflow parameters
2. Execute ComfyUI workflow with progress tracking
3. Download and save result images
4. Update task with final results
- Progress Tracking: Uses _set_status() throughout process for real-time updates
Scheduler Services (BaseTaskScheduler → GenerationTaskScheduler)¶
BaseTaskScheduler (Base Implementation)
- Responsibility: Queue management and task lifecycle control
- Key Methods:
- schedule_task(): Submit task to Celery queue
- cancel_task(): Cancel queued/running tasks
- Features:
- Celery integration with queue routing
- Task cancellation with status updates
- Redis event publishing for cancellations
GenerationTaskScheduler (Enhanced Implementation) - Additional Features: ComfyUI-specific cancellation - Enhanced Cancellation: - Cancels both Celery task and ComfyUI prompt - Handles multiple ComfyUI servers - Graceful degradation if ComfyUI unavailable
3. Configuration Layer¶
DispatchMap¶
- Purpose: Central service registry mapping task types to their implementations
- Structure:
Dict[TaskType, TaskHandlerComponents] - Components Per Task Type:
- CRUD factory function
- Processor factory function
- Scheduler factory function
- Schema type definitions (create, public, update)
- Benefits: Type-safe service resolution, easy task type registration
TaskHandlerComponents (TypedDict)¶
- Purpose: Type-safe configuration structure for each task type
- Fields: All required service factories and schema types
- Validation: Compile-time type checking via TypedDict
4. API Layer¶
TasksAPI¶
- Purpose: REST endpoints for task management
- Service Resolution: Uses DispatchMap to get appropriate services
- Endpoints:
- Generic: CRUD operations for all task types
- Specific: Type-specific operations (e.g.,
/generation/) - Error Handling: Comprehensive exception handling with appropriate HTTP status codes
Interaction Patterns¶
1. Task Creation Flow¶
2. Task Processing Flow¶
Celery → TaskProcessor → DispatchMap → Specific Processor
Processor → CRUD Service (status updates)
Processor → Redis Streams (events)
3. Service Resolution Pattern¶
Key Design Benefits¶
1. Extensibility¶
- Adding Task Types: Only requires implementing 3 services + configuration
- No Existing Code Changes: New types don't affect existing functionality
- Type Safety: Compile-time validation of service configurations
2. Separation of Concerns¶
- CRUD: Pure database operations
- Processor: Business logic with progress tracking
- Scheduler: Queue management and lifecycle control
- API: HTTP interface and validation
3. Testability¶
- Service Isolation: Each service can be tested independently
- Dependency Injection: Services can be mocked for testing
- Clear Interfaces: Abstract base classes define contracts
4. Maintainability¶
- Single Responsibility: Each component has a focused purpose
- Configuration-Driven: Easy to understand service relationships
- Consistent Patterns: All task types follow the same architecture
This architecture provides a robust, scalable foundation for task management while maintaining code clarity and enabling easy extension for new task types.