task_models.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. """
  2. 任务相关的基础数据模型
  3. 包含 TaskFileInfo 和 TaskChain,与具体工作流实现解耦
  4. 设计原则:
  5. 1. 职责分离:TaskFileInfo 专注业务数据,TaskChain 专注执行状态
  6. 2. 数据隔离:通过引用避免重复存储
  7. 3. 类型安全:明确的属性定义和类型注解
  8. 4. 独立模块:可被其他模块独立导入,避免循环依赖
  9. """
  10. from typing import Dict, Optional
  11. from datetime import datetime
  12. from foundation.observability.logger.loggering import review_logger as logger
  13. class TaskFileInfo:
  14. """任务文件信息类 - 封装单个任务的文件和上下文信息"""
  15. def __init__(self, file_info: dict):
  16. """
  17. 初始化任务文件信息
  18. Args:
  19. file_info: 包含文件和任务信息的字典
  20. """
  21. # 核心标识信息
  22. self.file_id = file_info['file_id']
  23. self.user_id = file_info['user_id']
  24. self.callback_task_id = file_info['callback_task_id']
  25. # 文件基本信息
  26. self.file_name = file_info.get('file_name', '')
  27. self.file_type = file_info.get('file_type', '')
  28. self.file_content = file_info.get('file_content', b'')
  29. # 审查配置信息(使用 .get() 提供默认值,支持键不存在的情况)
  30. review_item_config_raw = file_info.get('review_item_config')
  31. # 类型校验:确保review_item_config是列表
  32. if isinstance(review_item_config_raw, list):
  33. self.review_item_config = review_item_config_raw
  34. else:
  35. # 如果不是列表,记录警告并使用默认空列表
  36. logger.warning(
  37. f"review_item_config类型错误,期望list,实际{type(review_item_config_raw).__name__},"
  38. f"值: {review_item_config_raw},将使用空列表"
  39. )
  40. self.review_item_config = []
  41. self.project_plan_type = file_info.get('project_plan_type', '')
  42. self.tendency_review_role = file_info.get('tendency_review_role', '')
  43. self.test_designation_chunk_flag = file_info.get('test_designation_chunk_flag', '')
  44. # 时间戳信息
  45. self.launched_at = file_info.get('launched_at', 0)
  46. # 保存完整的原始数据(只读访问)
  47. self._file_info = file_info.copy() # 防止意外修改原始数据
  48. logger.debug(f"创建TaskFileInfo: {self.file_id} for user {self.user_id}")
  49. def get_file_info(self) -> dict:
  50. """获取完整的原始文件信息(只读副本)"""
  51. return self._file_info.copy()
  52. def get_review_config_list(self) -> list:
  53. """[已废弃] 获取审查配置列表 — 旧 review_config 已移除,仅保留兼容性"""
  54. logger.warning("get_review_config_list() 已废弃,请使用 get_review_item_config_list()")
  55. return []
  56. def get_review_item_config_list(self) -> list:
  57. """获取审查项配置列表(章节_审查维度格式)"""
  58. return self.review_item_config.copy()
  59. def get_project_plan_type(self) -> str:
  60. """获取工程方案类型"""
  61. return self.project_plan_type
  62. def get_tendency_review_role(self) -> str:
  63. """获取倾向性审查角色"""
  64. return self.tendency_review_role
  65. def get_test_designation_chunk_flag(self) -> str:
  66. """获取测试定位标志符"""
  67. return self.test_designation_chunk_flag
  68. def get_file_size(self) -> int:
  69. """获取文件大小(字节)"""
  70. return len(self.file_content) if self.file_content else 0
  71. def __str__(self) -> str:
  72. """字符串表示"""
  73. return f"TaskFileInfo(file_id={self.file_id}, user_id={self.user_id})"
  74. def __repr__(self) -> str:
  75. """调试用的字符串表示"""
  76. return f"TaskFileInfo(file_id='{self.file_id}', user_id='{self.user_id}', task_id='{self.callback_task_id}')"
  77. class TaskChain:
  78. """任务链 - 工作流执行状态跟踪,通过引用TaskFileInfo避免数据重复"""
  79. def __init__(self, task_file_info: TaskFileInfo):
  80. """
  81. 初始化任务链,引用 TaskFileInfo 避免数据重复
  82. Args:
  83. task_file_info: 任务文件信息对象
  84. """
  85. # 引用任务文件信息(避免重复存储)
  86. self.task_info = task_file_info
  87. # 执行状态信息(创建时不预设状态)
  88. self.status: str = "pending" # pending, processing, completed, failed
  89. self.current_stage: str = "created"
  90. self.created_at = datetime.now()
  91. self.started_at: Optional[datetime] = None
  92. self.completed_at: Optional[datetime] = None
  93. self.results: Dict = {}
  94. logger.info(f"创建任务链: {self.file_id} for user {self.user_id}")
  95. # 属性访问代理,保持向后兼容
  96. @property
  97. def callback_task_id(self) -> str:
  98. """获取回调任务ID"""
  99. return self.task_info.callback_task_id
  100. @property
  101. def file_id(self) -> str:
  102. """获取文件ID"""
  103. return self.task_info.file_id
  104. @property
  105. def user_id(self) -> str:
  106. """获取用户ID"""
  107. return self.task_info.user_id
  108. # 便捷方法
  109. def start_processing(self):
  110. """标记任务开始处理"""
  111. self.started_at = datetime.now()
  112. self.status = "processing"
  113. self.current_stage = "document_processing" # 开始处理时设置初始阶段
  114. logger.info(f"任务开始处理: {self.file_id}")
  115. def complete_processing(self):
  116. """标记任务处理完成"""
  117. self.completed_at = datetime.now()
  118. self.status = "completed"
  119. logger.info(f"任务处理完成: {self.file_id}")
  120. def fail_processing(self, error_message: str = ""):
  121. """标记任务处理失败"""
  122. self.completed_at = datetime.now()
  123. self.status = "failed"
  124. logger.error(f"任务处理失败: {self.file_id}, 错误: {error_message}")
  125. def update_stage(self, stage: str):
  126. """更新当前处理阶段"""
  127. self.current_stage = stage
  128. logger.debug(f"任务阶段更新: {self.file_id} -> {stage}")
  129. def __str__(self) -> str:
  130. """字符串表示"""
  131. return f"TaskChain(file_id={self.file_id}, status={self.status}, stage={self.current_stage})"
  132. def __repr__(self) -> str:
  133. """调试用的字符串表示"""
  134. return f"TaskChain(file_id='{self.file_id}', status='{self.status}', stage='{self.current_stage}', created='{self.created_at}')"