task_models.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. """
  2. 任务相关的基础数据模型
  3. 包含 TaskFileInfo 和 TaskChain,与具体工作流实现解耦
  4. 设计原则:
  5. 1. 职责分离:TaskFileInfo 专注业务数据,TaskChain 专注执行状态
  6. 2. 数据隔离:通过引用避免重复存储
  7. 3. 类型安全:明确的属性定义和类型注解
  8. 4. 独立模块:可被其他模块独立导入,避免循环依赖
  9. """
  10. from typing import Dict, Optional
  11. from datetime import datetime
  12. from foundation.observability.logger.loggering import server_logger as logger
  13. class TaskFileInfo:
  14. """任务文件信息类 - 封装单个任务的文件和上下文信息"""
  15. def __init__(self, file_info: dict):
  16. """
  17. 初始化任务文件信息
  18. Args:
  19. file_info: 包含文件和任务信息的字典
  20. """
  21. # 核心标识信息
  22. self.file_id = file_info['file_id']
  23. self.user_id = file_info['user_id']
  24. self.callback_task_id = file_info['callback_task_id']
  25. # 文件基本信息
  26. self.file_name = file_info.get('file_name', '')
  27. self.file_type = file_info.get('file_type', '')
  28. self.file_content = file_info.get('file_content', b'')
  29. # 审查配置信息(使用 .get() 提供默认值,支持键不存在的情况)
  30. review_config_raw = file_info.get('review_config')
  31. review_item_config_raw = file_info.get('review_item_config')
  32. # 类型校验:确保review_config是列表
  33. if isinstance(review_config_raw, list):
  34. self.review_config = review_config_raw
  35. else:
  36. # 如果不是列表,记录警告并使用默认空列表
  37. logger.warning(
  38. f"review_config类型错误,期望list,实际{type(review_config_raw).__name__},"
  39. f"值: {review_config_raw},将使用空列表"
  40. )
  41. self.review_config = []
  42. # 类型校验:确保review_item_config是列表
  43. if isinstance(review_item_config_raw, list):
  44. self.review_item_config = review_item_config_raw
  45. else:
  46. # 如果不是列表,记录警告并使用默认空列表
  47. logger.warning(
  48. f"review_item_config类型错误,期望list,实际{type(review_item_config_raw).__name__},"
  49. f"值: {review_item_config_raw},将使用空列表"
  50. )
  51. self.review_item_config = []
  52. self.project_plan_type = file_info.get('project_plan_type', '')
  53. self.tendency_review_role = file_info.get('tendency_review_role', '')
  54. self.test_designation_chunk_flag = file_info.get('test_designation_chunk_flag', '')
  55. # 时间戳信息
  56. self.launched_at = file_info.get('launched_at', 0)
  57. # 保存完整的原始数据(只读访问)
  58. self._file_info = file_info.copy() # 防止意外修改原始数据
  59. logger.debug(f"创建TaskFileInfo: {self.file_id} for user {self.user_id}")
  60. def get_file_info(self) -> dict:
  61. """获取完整的原始文件信息(只读副本)"""
  62. return self._file_info.copy()
  63. def get_review_config_list(self) -> list:
  64. """获取审查配置列表"""
  65. return self.review_config.copy()
  66. def get_review_item_config_list(self) -> list:
  67. """获取审查项配置列表(章节_审查维度格式)"""
  68. return self.review_item_config.copy()
  69. def get_project_plan_type(self) -> str:
  70. """获取工程方案类型"""
  71. return self.project_plan_type
  72. def get_tendency_review_role(self) -> str:
  73. """获取倾向性审查角色"""
  74. return self.tendency_review_role
  75. def get_test_designation_chunk_flag(self) -> str:
  76. """获取测试定位标志符"""
  77. return self.test_designation_chunk_flag
  78. def has_review_type(self, review_type: str) -> bool:
  79. """检查是否包含指定的审查类型"""
  80. return review_type in self.review_config
  81. def get_file_size(self) -> int:
  82. """获取文件大小(字节)"""
  83. return len(self.file_content) if self.file_content else 0
  84. def __str__(self) -> str:
  85. """字符串表示"""
  86. return f"TaskFileInfo(file_id={self.file_id}, user_id={self.user_id})"
  87. def __repr__(self) -> str:
  88. """调试用的字符串表示"""
  89. return f"TaskFileInfo(file_id='{self.file_id}', user_id='{self.user_id}', task_id='{self.callback_task_id}')"
  90. class TaskChain:
  91. """任务链 - 工作流执行状态跟踪,通过引用TaskFileInfo避免数据重复"""
  92. def __init__(self, task_file_info: TaskFileInfo):
  93. """
  94. 初始化任务链,引用 TaskFileInfo 避免数据重复
  95. Args:
  96. task_file_info: 任务文件信息对象
  97. """
  98. # 引用任务文件信息(避免重复存储)
  99. self.task_info = task_file_info
  100. # 执行状态信息(创建时不预设状态)
  101. self.status: str = "pending" # pending, processing, completed, failed
  102. self.current_stage: str = "created"
  103. self.created_at = datetime.now()
  104. self.started_at: Optional[datetime] = None
  105. self.completed_at: Optional[datetime] = None
  106. self.results: Dict = {}
  107. logger.info(f"创建任务链: {self.file_id} for user {self.user_id}")
  108. # 属性访问代理,保持向后兼容
  109. @property
  110. def callback_task_id(self) -> str:
  111. """获取回调任务ID"""
  112. return self.task_info.callback_task_id
  113. @property
  114. def file_id(self) -> str:
  115. """获取文件ID"""
  116. return self.task_info.file_id
  117. @property
  118. def user_id(self) -> str:
  119. """获取用户ID"""
  120. return self.task_info.user_id
  121. # 便捷方法
  122. def start_processing(self):
  123. """标记任务开始处理"""
  124. self.started_at = datetime.now()
  125. self.status = "processing"
  126. self.current_stage = "document_processing" # 开始处理时设置初始阶段
  127. logger.info(f"任务开始处理: {self.file_id}")
  128. def complete_processing(self):
  129. """标记任务处理完成"""
  130. self.completed_at = datetime.now()
  131. self.status = "completed"
  132. logger.info(f"任务处理完成: {self.file_id}")
  133. def fail_processing(self, error_message: str = ""):
  134. """标记任务处理失败"""
  135. self.completed_at = datetime.now()
  136. self.status = "failed"
  137. logger.error(f"任务处理失败: {self.file_id}, 错误: {error_message}")
  138. def update_stage(self, stage: str):
  139. """更新当前处理阶段"""
  140. self.current_stage = stage
  141. logger.debug(f"任务阶段更新: {self.file_id} -> {stage}")
  142. def __str__(self) -> str:
  143. """字符串表示"""
  144. return f"TaskChain(file_id={self.file_id}, status={self.status}, stage={self.current_stage})"
  145. def __repr__(self) -> str:
  146. """调试用的字符串表示"""
  147. return f"TaskChain(file_id='{self.file_id}', status='{self.status}', stage='{self.current_stage}', created='{self.created_at}')"