| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- """
- 任务相关的基础数据模型
- 包含 TaskFileInfo 和 TaskChain,与具体工作流实现解耦
- 设计原则:
- 1. 职责分离:TaskFileInfo 专注业务数据,TaskChain 专注执行状态
- 2. 数据隔离:通过引用避免重复存储
- 3. 类型安全:明确的属性定义和类型注解
- 4. 独立模块:可被其他模块独立导入,避免循环依赖
- """
- from typing import Dict, Optional
- from datetime import datetime
- from foundation.observability.logger.loggering import server_logger as logger
- class TaskFileInfo:
- """任务文件信息类 - 封装单个任务的文件和上下文信息"""
- def __init__(self, file_info: dict):
- """
- 初始化任务文件信息
- Args:
- file_info: 包含文件和任务信息的字典
- """
- # 核心标识信息
- self.file_id = file_info['file_id']
- self.user_id = file_info['user_id']
- self.callback_task_id = file_info['callback_task_id']
- # 文件基本信息
- self.file_name = file_info.get('file_name', '')
- self.file_type = file_info.get('file_type', '')
- self.file_content = file_info.get('file_content', b'')
- # 审查配置信息
- self.review_config = file_info.get('review_config', [])
- self.project_plan_type = file_info.get('project_plan_type', '')
- self.tendency_review_role = file_info.get('tendency_review_role', '')
- # 时间戳信息
- self.launched_at = file_info.get('launched_at', 0)
- # 保存完整的原始数据(只读访问)
- self._file_info = file_info.copy() # 防止意外修改原始数据
- logger.debug(f"创建TaskFileInfo: {self.file_id} for user {self.user_id}")
- def get_file_info(self) -> dict:
- """获取完整的原始文件信息(只读副本)"""
- return self._file_info.copy()
- def get_review_config_list(self) -> list:
- """获取审查配置列表"""
- return self.review_config.copy()
- def get_project_plan_type(self) -> str:
- """获取工程方案类型"""
- return self.project_plan_type
- def get_tendency_review_role(self) -> str:
- """获取倾向性审查角色"""
- return self.tendency_review_role
- def has_review_type(self, review_type: str) -> bool:
- """检查是否包含指定的审查类型"""
- return review_type in self.review_config
- def get_file_size(self) -> int:
- """获取文件大小(字节)"""
- return len(self.file_content) if self.file_content else 0
- def __str__(self) -> str:
- """字符串表示"""
- return f"TaskFileInfo(file_id={self.file_id}, user_id={self.user_id})"
- def __repr__(self) -> str:
- """调试用的字符串表示"""
- return f"TaskFileInfo(file_id='{self.file_id}', user_id='{self.user_id}', task_id='{self.callback_task_id}')"
- class TaskChain:
- """任务链 - 工作流执行状态跟踪,通过引用TaskFileInfo避免数据重复"""
- def __init__(self, task_file_info: TaskFileInfo):
- """
- 初始化任务链,引用 TaskFileInfo 避免数据重复
- Args:
- task_file_info: 任务文件信息对象
- """
- # 引用任务文件信息(避免重复存储)
- self.task_info = task_file_info
- # 执行状态信息(创建时不预设状态)
- self.status: str = "pending" # pending, processing, completed, failed
- self.current_stage: str = "created"
- self.created_at = datetime.now()
- self.started_at: Optional[datetime] = None
- self.completed_at: Optional[datetime] = None
- self.results: Dict = {}
- logger.info(f"创建任务链: {self.file_id} for user {self.user_id}")
- # 属性访问代理,保持向后兼容
- @property
- def callback_task_id(self) -> str:
- """获取回调任务ID"""
- return self.task_info.callback_task_id
- @property
- def file_id(self) -> str:
- """获取文件ID"""
- return self.task_info.file_id
- @property
- def user_id(self) -> str:
- """获取用户ID"""
- return self.task_info.user_id
- # 便捷方法
- def start_processing(self):
- """标记任务开始处理"""
- self.started_at = datetime.now()
- self.status = "processing"
- self.current_stage = "document_processing" # 开始处理时设置初始阶段
- logger.info(f"任务开始处理: {self.file_id}")
- def complete_processing(self):
- """标记任务处理完成"""
- self.completed_at = datetime.now()
- self.status = "completed"
- logger.info(f"任务处理完成: {self.file_id}")
- def fail_processing(self, error_message: str = ""):
- """标记任务处理失败"""
- self.completed_at = datetime.now()
- self.status = "failed"
- logger.error(f"任务处理失败: {self.file_id}, 错误: {error_message}")
- def update_stage(self, stage: str):
- """更新当前处理阶段"""
- self.current_stage = stage
- logger.debug(f"任务阶段更新: {self.file_id} -> {stage}")
- def __str__(self) -> str:
- """字符串表示"""
- return f"TaskChain(file_id={self.file_id}, status={self.status}, stage={self.current_stage})"
- def __repr__(self) -> str:
- """调试用的字符串表示"""
- return f"TaskChain(file_id='{self.file_id}', status='{self.status}', stage='{self.current_stage}', created='{self.created_at}')"
|