""" 任务相关的基础数据模型 包含 TaskFileInfo 和 TaskChain,与具体工作流实现解耦 设计原则: 1. 职责分离:TaskFileInfo 专注业务数据,TaskChain 专注执行状态 2. 数据隔离:通过引用避免重复存储 3. 类型安全:明确的属性定义和类型注解 4. 独立模块:可被其他模块独立导入,避免循环依赖 """ from typing import Dict, Optional from datetime import datetime from foundation.observability.logger.loggering import server_logger as logger class TaskFileInfo: """任务文件信息类 - 封装单个任务的文件和上下文信息""" def __init__(self, file_info: dict): """ 初始化任务文件信息 Args: file_info: 包含文件和任务信息的字典 """ # 核心标识信息 self.file_id = file_info['file_id'] self.user_id = file_info['user_id'] self.callback_task_id = file_info['callback_task_id'] # 文件基本信息 self.file_name = file_info.get('file_name', '') self.file_type = file_info.get('file_type', '') self.file_content = file_info.get('file_content', b'') # 审查配置信息 self.review_config = file_info.get('review_config', []) self.project_plan_type = file_info.get('project_plan_type', '') self.tendency_review_role = file_info.get('tendency_review_role', '') # 时间戳信息 self.launched_at = file_info.get('launched_at', 0) # 保存完整的原始数据(只读访问) self._file_info = file_info.copy() # 防止意外修改原始数据 logger.debug(f"创建TaskFileInfo: {self.file_id} for user {self.user_id}") def get_file_info(self) -> dict: """获取完整的原始文件信息(只读副本)""" return self._file_info.copy() def get_review_config_list(self) -> list: """获取审查配置列表""" return self.review_config.copy() def get_project_plan_type(self) -> str: """获取工程方案类型""" return self.project_plan_type def get_tendency_review_role(self) -> str: """获取倾向性审查角色""" return self.tendency_review_role def has_review_type(self, review_type: str) -> bool: """检查是否包含指定的审查类型""" return review_type in self.review_config def get_file_size(self) -> int: """获取文件大小(字节)""" return len(self.file_content) if self.file_content else 0 def __str__(self) -> str: """字符串表示""" return f"TaskFileInfo(file_id={self.file_id}, user_id={self.user_id})" def __repr__(self) -> str: """调试用的字符串表示""" return f"TaskFileInfo(file_id='{self.file_id}', user_id='{self.user_id}', task_id='{self.callback_task_id}')" class TaskChain: """任务链 - 工作流执行状态跟踪,通过引用TaskFileInfo避免数据重复""" def __init__(self, task_file_info: TaskFileInfo): """ 初始化任务链,引用 TaskFileInfo 避免数据重复 Args: task_file_info: 任务文件信息对象 """ # 引用任务文件信息(避免重复存储) self.task_info = task_file_info # 执行状态信息(创建时不预设状态) self.status: str = "pending" # pending, processing, completed, failed self.current_stage: str = "created" self.created_at = datetime.now() self.started_at: Optional[datetime] = None self.completed_at: Optional[datetime] = None self.results: Dict = {} logger.info(f"创建任务链: {self.file_id} for user {self.user_id}") # 属性访问代理,保持向后兼容 @property def callback_task_id(self) -> str: """获取回调任务ID""" return self.task_info.callback_task_id @property def file_id(self) -> str: """获取文件ID""" return self.task_info.file_id @property def user_id(self) -> str: """获取用户ID""" return self.task_info.user_id # 便捷方法 def start_processing(self): """标记任务开始处理""" self.started_at = datetime.now() self.status = "processing" self.current_stage = "document_processing" # 开始处理时设置初始阶段 logger.info(f"任务开始处理: {self.file_id}") def complete_processing(self): """标记任务处理完成""" self.completed_at = datetime.now() self.status = "completed" logger.info(f"任务处理完成: {self.file_id}") def fail_processing(self, error_message: str = ""): """标记任务处理失败""" self.completed_at = datetime.now() self.status = "failed" logger.error(f"任务处理失败: {self.file_id}, 错误: {error_message}") def update_stage(self, stage: str): """更新当前处理阶段""" self.current_stage = stage logger.debug(f"任务阶段更新: {self.file_id} -> {stage}") def __str__(self) -> str: """字符串表示""" return f"TaskChain(file_id={self.file_id}, status={self.status}, stage={self.current_stage})" def __repr__(self) -> str: """调试用的字符串表示""" return f"TaskChain(file_id='{self.file_id}', status='{self.status}', stage='{self.current_stage}', created='{self.created_at}')"