task_models.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. """
  2. 任务相关的基础数据模型
  3. 包含 TaskFileInfo 和 TaskChain,与具体工作流实现解耦
  4. 设计原则:
  5. 1. 职责分离:TaskFileInfo 专注业务数据,TaskChain 专注执行状态
  6. 2. 数据隔离:通过引用避免重复存储
  7. 3. 类型安全:明确的属性定义和类型注解
  8. 4. 独立模块:可被其他模块独立导入,避免循环依赖
  9. """
  10. from typing import Dict, Optional
  11. from datetime import datetime
  12. from foundation.observability.logger.loggering import server_logger as logger
  13. class TaskFileInfo:
  14. """任务文件信息类 - 封装单个任务的文件和上下文信息"""
  15. def __init__(self, file_info: dict):
  16. """
  17. 初始化任务文件信息
  18. Args:
  19. file_info: 包含文件和任务信息的字典
  20. """
  21. # 核心标识信息
  22. self.file_id = file_info['file_id']
  23. self.user_id = file_info['user_id']
  24. self.callback_task_id = file_info['callback_task_id']
  25. # 文件基本信息
  26. self.file_name = file_info.get('file_name', '')
  27. self.file_type = file_info.get('file_type', '')
  28. self.file_content = file_info.get('file_content', b'')
  29. # 审查配置信息
  30. review_config_raw = file_info.get('review_config', [])
  31. # 类型校验:确保review_config是列表
  32. if isinstance(review_config_raw, list):
  33. self.review_config = review_config_raw
  34. else:
  35. # 如果不是列表,记录警告并使用默认空列表
  36. logger.warning(
  37. f"review_config类型错误,期望list,实际{type(review_config_raw).__name__},"
  38. f"值: {review_config_raw},将使用空列表"
  39. )
  40. self.review_config = []
  41. self.project_plan_type = file_info.get('project_plan_type', '')
  42. self.tendency_review_role = file_info.get('tendency_review_role', '')
  43. self.test_designation_chunk_flag = file_info.get('test_designation_chunk_flag', '')
  44. # 时间戳信息
  45. self.launched_at = file_info.get('launched_at', 0)
  46. # 保存完整的原始数据(只读访问)
  47. self._file_info = file_info.copy() # 防止意外修改原始数据
  48. logger.debug(f"创建TaskFileInfo: {self.file_id} for user {self.user_id}")
  49. def get_file_info(self) -> dict:
  50. """获取完整的原始文件信息(只读副本)"""
  51. return self._file_info.copy()
  52. def get_review_config_list(self) -> list:
  53. """获取审查配置列表"""
  54. return self.review_config.copy()
  55. def get_project_plan_type(self) -> str:
  56. """获取工程方案类型"""
  57. return self.project_plan_type
  58. def get_tendency_review_role(self) -> str:
  59. """获取倾向性审查角色"""
  60. return self.tendency_review_role
  61. def get_test_designation_chunk_flag(self) -> str:
  62. """获取测试定位标志符"""
  63. return self.test_designation_chunk_flag
  64. def has_review_type(self, review_type: str) -> bool:
  65. """检查是否包含指定的审查类型"""
  66. return review_type in self.review_config
  67. def get_file_size(self) -> int:
  68. """获取文件大小(字节)"""
  69. return len(self.file_content) if self.file_content else 0
  70. def __str__(self) -> str:
  71. """字符串表示"""
  72. return f"TaskFileInfo(file_id={self.file_id}, user_id={self.user_id})"
  73. def __repr__(self) -> str:
  74. """调试用的字符串表示"""
  75. return f"TaskFileInfo(file_id='{self.file_id}', user_id='{self.user_id}', task_id='{self.callback_task_id}')"
  76. class TaskChain:
  77. """任务链 - 工作流执行状态跟踪,通过引用TaskFileInfo避免数据重复"""
  78. def __init__(self, task_file_info: TaskFileInfo):
  79. """
  80. 初始化任务链,引用 TaskFileInfo 避免数据重复
  81. Args:
  82. task_file_info: 任务文件信息对象
  83. """
  84. # 引用任务文件信息(避免重复存储)
  85. self.task_info = task_file_info
  86. # 执行状态信息(创建时不预设状态)
  87. self.status: str = "pending" # pending, processing, completed, failed
  88. self.current_stage: str = "created"
  89. self.created_at = datetime.now()
  90. self.started_at: Optional[datetime] = None
  91. self.completed_at: Optional[datetime] = None
  92. self.results: Dict = {}
  93. logger.info(f"创建任务链: {self.file_id} for user {self.user_id}")
  94. # 属性访问代理,保持向后兼容
  95. @property
  96. def callback_task_id(self) -> str:
  97. """获取回调任务ID"""
  98. return self.task_info.callback_task_id
  99. @property
  100. def file_id(self) -> str:
  101. """获取文件ID"""
  102. return self.task_info.file_id
  103. @property
  104. def user_id(self) -> str:
  105. """获取用户ID"""
  106. return self.task_info.user_id
  107. # 便捷方法
  108. def start_processing(self):
  109. """标记任务开始处理"""
  110. self.started_at = datetime.now()
  111. self.status = "processing"
  112. self.current_stage = "document_processing" # 开始处理时设置初始阶段
  113. logger.info(f"任务开始处理: {self.file_id}")
  114. def complete_processing(self):
  115. """标记任务处理完成"""
  116. self.completed_at = datetime.now()
  117. self.status = "completed"
  118. logger.info(f"任务处理完成: {self.file_id}")
  119. def fail_processing(self, error_message: str = ""):
  120. """标记任务处理失败"""
  121. self.completed_at = datetime.now()
  122. self.status = "failed"
  123. logger.error(f"任务处理失败: {self.file_id}, 错误: {error_message}")
  124. def update_stage(self, stage: str):
  125. """更新当前处理阶段"""
  126. self.current_stage = stage
  127. logger.debug(f"任务阶段更新: {self.file_id} -> {stage}")
  128. def __str__(self) -> str:
  129. """字符串表示"""
  130. return f"TaskChain(file_id={self.file_id}, status={self.status}, stage={self.current_stage})"
  131. def __repr__(self) -> str:
  132. """调试用的字符串表示"""
  133. return f"TaskChain(file_id='{self.file_id}', status='{self.status}', stage='{self.current_stage}', created='{self.created_at}')"