Skip to content

Task Management System Class Diagram

Complete System Architecture

The following class diagram illustrates the relationships between all components in the Task Management System:

classDiagram
    %% Core Model Classes
    class Base {
        <<abstract>>
        +UUID id
        +datetime created_at
        +datetime updated_at
    }

    class Task {
        <<entity>>
        +TaskType task_type
        +string queue_name
        +int priority
        +string origin_instance
        +string origin_zone
        +string origin_tenant_id
        +TaskStatus status
        +datetime started_at
        +datetime completed_at
        +string executor_type
        +string executor_instance
        +string celery_task_id
        +string service_task_id
        +List~string~ errors
        +int retry_count
        +float progress
        +bool is_resumed
        +int resume_attempt_count
        +datetime last_resume_attempt
        +mark_as_resumed()
        +add_error(error_message)
        +get_last_error()
    }

    class GenerationTask {
        <<entity>>
        +UUID backend_generation_id
        +UUID backend_prediction_id
        +string subject_model_path
        +string product_model_path
        +string style_model_path
        +string product2_model_path
        +float subject_lora_weight
        +float product_lora_weight
        +float style_lora_weight
        +float product2_lora_weight
        +string comfy_prompt_id
        +string workflow_id
        +string prompt
        +int width
        +int height
        +int seed
        +string result_image_path
        +string comfy_server_url
        +string comfy_workflow_data
        +List~string~ comfyui_retry_server_urls
        +can_resume_comfyui() bool
        +add_generation_error(error_message, server_url)
        +get_generation_error_history() List~string~
        +get_comfyui_retry_server_history() List~string~
    }

    %% Enums
    class TaskType {
        <<enumeration>>
        GENERATION
    }

    class TaskStatus {
        <<enumeration>>
        PENDING
        EXECUTING
        COMPLETED
        FAILED
        CANCELLED
    }

    %% Schema Classes
    class TaskBase {
        <<schema>>
        +TaskType task_type
        +int priority
        +string origin_instance
        +string origin_zone
        +string origin_tenant_id
    }

    class CreateTask {
        <<schema>>
    }

    class TaskPublic {
        <<schema>>
        +UUID id
        +datetime created_at
        +datetime updated_at
        +TaskStatus status
        +datetime started_at
        +datetime completed_at
        +string executor_type
        +string executor_instance
        +string celery_task_id
        +string service_task_id
        +List~string~ errors
        +int retry_count
        +float progress
    }

    class TaskPublicUpdate {
        <<schema>>
        +TaskStatus status
        +int priority
        +string executor_type
        +string executor_instance
        +List~string~ errors
    }

    class TaskUpdate {
        <<schema>>
        +datetime created_at
        +datetime updated_at
        +TaskType task_type
        +string origin_instance
        +string origin_zone
        +string origin_tenant_id
        +datetime started_at
        +datetime completed_at
        +string celery_task_id
        +string service_task_id
        +int retry_count
        +float progress
        +bool is_resumed
        +int resume_attempt_count
        +datetime last_resume_attempt
    }

    class CreateGenerationTask {
        <<schema>>
        +string subject_model_path
        +string product_model_path
        +string style_model_path
        +string product2_model_path
        +float subject_lora_weight
        +float product_lora_weight
        +float style_lora_weight
        +float product2_lora_weight
        +string prompt
        +int width
        +int height
        +int seed
    }

    class GenerationTaskPublic {
        <<schema>>
        +UUID backend_generation_id
        +UUID backend_prediction_id
        +string subject_model_path
        +string product_model_path
        +string style_model_path
        +string product2_model_path
        +float subject_lora_weight
        +float product_lora_weight
        +float style_lora_weight
        +float product2_lora_weight
        +string workflow_id
        +string prompt
        +int width
        +int height
        +int seed
        +string result_image_path
        +List~string~ comfyui_retry_server_urls
    }

    class GenerationTaskPublicUpdate {
        <<schema>>
    }

    class GenerationTaskUpdate {
        <<schema>>
        +UUID backend_generation_id
        +UUID backend_prediction_id
        +string subject_model_path
        +string product_model_path
        +string style_model_path
        +string product2_model_path
        +float subject_lora_weight
        +float product_lora_weight
        +float style_lora_weight
        +float product2_lora_weight
        +string workflow_id
        +string comfy_prompt_id
        +string prompt
        +int width
        +int height
        +int seed
        +string result_image_path
        +string comfy_server_url
        +string comfy_workflow_data
        +List~string~ comfyui_retry_server_urls
    }

    %% Service Classes
    class BaseTaskCRUD~T, CreateSchemaType, UpdateSchemaType, PublicSchemaType~ {
        <<abstract>>
        #Task model
        +create(db, create_schema) T
        +get(db, task_id) T
        +get_multi(db, skip, limit, filters) List~T~
        +update(db, task_id, update_data) T
        +delete(db, task_id) T
        +update_status(db, task_id, status) T
        #_create_entity_from_schema(create_schema) T
        #_update_entity_from_schema(entity, update_schema) T
    }

    class GenerationTaskCRUD {
        <<service>>
        +get_by_workflow(db, workflow_id, limit) List~GenerationTask~
        +get_by_status_and_workflow(db, status, workflow_id, limit) List~GenerationTask~
        +get_by_backend_prediction_id(db, backend_prediction_id) GenerationTask
        +get_pending_tasks(db, queue_name, limit) List~GenerationTask~
        +update_result(db, task_id, result_image_path, status) GenerationTask
    }

    class BaseTaskProcessor~T, C~ {
        <<abstract>>
        +process(task, crud)* void
        #_set_status(task, crud, status, progress, error_message) void
    }

    class GenerationTaskProcessor {
        <<service>>
        -image_storage
        -settings
        +process(task, crud) void
        -_resume_comfyui_task(task, crud) void
        -_start_fresh_execution(task, crud) void
        -_check_comfyui_status(task) dict
        -_execute_generation_task(task, crud) void
        -_handle_comfyui_completion(task, crud, comfy_status) void
        -_handle_comfyui_failure(task, crud, comfy_status) void
        -_handle_resume_failure(task, crud, error_msg) void
        -_save_result_image(task, result, comfy_service) string
    }

    class BaseTaskScheduler {
        <<service>>
        +schedule_task(task, priority) string
        +reschedule_task(task) string
        +cancel_task(task_id, publish_to_redis) bool
    }

    class GenerationTaskScheduler {
        <<service>>
        +cancel_task(task_id) bool
    }

    %% Configuration Classes
    class TaskHandlerComponents {
        <<interface>>
        +crud() BaseTaskCRUD
        +processor() BaseTaskProcessor
        +scheduler() BaseTaskScheduler
        +Type~BaseModel~ create_schema
        +Type~BaseModel~ public_schema
        +Type~BaseModel~ public_update_schema
        +Type~BaseModel~ update_schema
    }

    class DispatchMap {
        <<configuration>>
        +Dict~TaskType, TaskHandlerComponents~ map
        +get_handlers(task_type) TaskHandlerComponents
    }

    %% API Layer
    class TasksAPI {
        <<controller>>
        +create_task(task_data) TaskPublicAPI
        +list_tasks(filters) List~TaskPublicAPI~
        +get_task(task_id) TaskPublicAPI
        +update_task(task_id, update_data) TaskPublicAPI
        +delete_task(task_id) TaskPublicAPI
        +cancel_task(task_id) TaskPublicAPI
    }

    %% Queue Layer
    class CeleryTask {
        <<queue>>
        +process_task(task_id) void
    }

    class TaskProcessor {
        <<service>>
        +process_task_business_logic(task_id) void
    }

    %% Relationships - Inheritance
    Base <|-- Task : inherits
    Task <|-- GenerationTask : polymorphic inheritance

    TaskBase <|-- CreateTask : extends
    TaskBase <|-- TaskPublic : extends
    CreateTask <|-- CreateGenerationTask : extends
    TaskPublic <|-- GenerationTaskPublic : extends
    TaskPublicUpdate <|-- GenerationTaskPublicUpdate : extends
    TaskUpdate <|-- GenerationTaskUpdate : extends
    TaskPublicUpdate <|-- TaskUpdate : extends

    BaseTaskCRUD <|-- GenerationTaskCRUD : implements
    BaseTaskProcessor <|-- GenerationTaskProcessor : implements
    BaseTaskScheduler <|-- GenerationTaskScheduler : extends

    %% Relationships - Composition
    Task "1" *-- "1" TaskType : has
    Task "1" *-- "1" TaskStatus : has
    GenerationTaskCRUD ..> GenerationTask : manages
    GenerationTaskProcessor ..> GenerationTask : processes
    GenerationTaskScheduler ..> GenerationTask : schedules

    %% Relationships - Usage
    TaskHandlerComponents ..> BaseTaskCRUD : creates
    TaskHandlerComponents ..> BaseTaskProcessor : creates
    TaskHandlerComponents ..> BaseTaskScheduler : creates
    DispatchMap "1" *-- "*" TaskHandlerComponents : contains

    TasksAPI ..> DispatchMap : uses
    TasksAPI ..> CreateTask : validates
    TasksAPI ..> TaskPublic : returns
    TasksAPI ..> TaskPublicUpdate : accepts

    CeleryTask ..> TaskProcessor : delegates to
    TaskProcessor ..> DispatchMap : resolves services
    TaskProcessor ..> BaseTaskProcessor : executes

    %% Schema relationships
    GenerationTaskCRUD ..> CreateGenerationTask : accepts
    GenerationTaskCRUD ..> GenerationTaskUpdate : accepts
    GenerationTaskCRUD ..> GenerationTaskPublic : returns

Component Explanations

1. Model Layer (Entities)

Task (Abstract Base Entity)

  • Purpose: Provides common fields and polymorphic inheritance base
  • Key Fields: id, task_type, status, priority, origin_instance, timestamps
  • Polymorphic Configuration: Uses task_type as discriminator column
  • Database Strategy: Single table inheritance with joined tables for specific task data

GenerationTask (Concrete Entity)

  • Purpose: Extends Task with generation-specific fields
  • Key Fields: Model paths, LoRA weights, prompt, dimensions, result path
  • Inheritance: Inherits all base task functionality from Task
  • Storage: Additional fields stored in separate generation_tasks table

Enums (TaskStatus, TaskType)

  • TaskStatus: Defines task lifecycle states (PENDING → EXECUTING → COMPLETED/FAILED)
  • TaskType: Registry of available task types (currently GENERATION)
  • Usage: Referenced by Task entity and used for service dispatch

2. Service Layer

CRUD Services (BaseTaskCRUD → GenerationTaskCRUD)

BaseTaskCRUD (Generic Abstract Base) - Responsibility: Common database operations for all task types - Generic Type: BaseTaskCRUD[T, CreateSchemaType, UpdateSchemaType, PublicSchemaType] - Key Methods: - create(): Convert schema to entity and persist - get()/get_multi(): Retrieve tasks with filtering - update(): Apply updates with validation - delete(): Remove tasks - update_status(): Status updates with timestamp management - Extension Points: - _create_entity_from_schema(): Custom schema-to-entity conversion - _update_entity_from_schema(): Custom update logic

GenerationTaskCRUD (Concrete Implementation) - Inheritance: Extends BaseTaskCRUD with GenerationTask-specific operations - Additional Methods: - get_by_workflow(): Filter by workflow type - get_by_backend_prediction_id(): Find tasks by external IDs - update_result(): Update with generated image path - Customization: Overrides _create_entity_from_schema() to set default workflow

Processor Services (BaseTaskProcessor → GenerationTaskProcessor)

BaseTaskProcessor (Abstract Base) - Responsibility: Task execution with progress tracking and event publishing - Generic Type: BaseTaskProcessor[T, C] where T=Task, C=CRUD - Key Methods: - process(): Abstract method for business logic (must implement) - _set_status(): Unified status updates with Redis events - Features: - Automatic timestamp management (started_at, completed_at) - Redis stream publishing for real-time updates - Error handling and progress tracking

GenerationTaskProcessor (Concrete Implementation) - Inheritance: Extends BaseTaskProcessor for generation-specific logic - Dependencies: ComfyUI service, image storage, workflow definitions - Process Flow: 1. Adapt task data to workflow parameters 2. Execute ComfyUI workflow with progress tracking 3. Download and save result images 4. Update task with final results - Progress Tracking: Uses _set_status() throughout process for real-time updates

Scheduler Services (BaseTaskScheduler → GenerationTaskScheduler)

BaseTaskScheduler (Base Implementation) - Responsibility: Queue management and task lifecycle control - Key Methods: - schedule_task(): Submit task to Celery queue - cancel_task(): Cancel queued/running tasks - Features: - Celery integration with queue routing - Task cancellation with status updates - Redis event publishing for cancellations

GenerationTaskScheduler (Enhanced Implementation) - Additional Features: ComfyUI-specific cancellation - Enhanced Cancellation: - Cancels both Celery task and ComfyUI prompt - Handles multiple ComfyUI servers - Graceful degradation if ComfyUI unavailable

3. Configuration Layer

DispatchMap

  • Purpose: Central service registry mapping task types to their implementations
  • Structure: Dict[TaskType, TaskHandlerComponents]
  • Components Per Task Type:
  • CRUD factory function
  • Processor factory function
  • Scheduler factory function
  • Schema type definitions (create, public, update)
  • Benefits: Type-safe service resolution, easy task type registration

TaskHandlerComponents (TypedDict)

  • Purpose: Type-safe configuration structure for each task type
  • Fields: All required service factories and schema types
  • Validation: Compile-time type checking via TypedDict

4. API Layer

TasksAPI

  • Purpose: REST endpoints for task management
  • Service Resolution: Uses DispatchMap to get appropriate services
  • Endpoints:
  • Generic: CRUD operations for all task types
  • Specific: Type-specific operations (e.g., /generation/)
  • Error Handling: Comprehensive exception handling with appropriate HTTP status codes

Interaction Patterns

1. Task Creation Flow

API → DispatchMap → CRUD Service → Database
API → DispatchMap → Scheduler Service → Celery Queue

2. Task Processing Flow

Celery → TaskProcessor → DispatchMap → Specific Processor
Processor → CRUD Service (status updates)
Processor → Redis Streams (events)

3. Service Resolution Pattern

Component → DispatchMap[task_type] → TaskHandlerComponents → Service Instance

Key Design Benefits

1. Extensibility

  • Adding Task Types: Only requires implementing 3 services + configuration
  • No Existing Code Changes: New types don't affect existing functionality
  • Type Safety: Compile-time validation of service configurations

2. Separation of Concerns

  • CRUD: Pure database operations
  • Processor: Business logic with progress tracking
  • Scheduler: Queue management and lifecycle control
  • API: HTTP interface and validation

3. Testability

  • Service Isolation: Each service can be tested independently
  • Dependency Injection: Services can be mocked for testing
  • Clear Interfaces: Abstract base classes define contracts

4. Maintainability

  • Single Responsibility: Each component has a focused purpose
  • Configuration-Driven: Easy to understand service relationships
  • Consistent Patterns: All task types follow the same architecture

This architecture provides a robust, scalable foundation for task management while maintaining code clarity and enabling easy extension for new task types.