task_models.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. """
  2. 任务相关的基础数据模型
  3. 包含 TaskFileInfo 和 TaskChain,与具体工作流实现解耦
  4. 设计原则:
  5. 1. 职责分离:TaskFileInfo 专注业务数据,TaskChain 专注执行状态
  6. 2. 数据隔离:通过引用避免重复存储
  7. 3. 类型安全:明确的属性定义和类型注解
  8. 4. 独立模块:可被其他模块独立导入,避免循环依赖
  9. """
  10. from typing import Dict, Optional
  11. from datetime import datetime
  12. from foundation.observability.logger.loggering import server_logger as logger
  13. class TaskFileInfo:
  14. """任务文件信息类 - 封装单个任务的文件和上下文信息"""
  15. def __init__(self, file_info: dict):
  16. """
  17. 初始化任务文件信息
  18. Args:
  19. file_info: 包含文件和任务信息的字典
  20. """
  21. # 核心标识信息
  22. self.file_id = file_info['file_id']
  23. self.user_id = file_info['user_id']
  24. self.callback_task_id = file_info['callback_task_id']
  25. # 文件基本信息
  26. self.file_name = file_info.get('file_name', '')
  27. self.file_type = file_info.get('file_type', '')
  28. self.file_content = file_info.get('file_content', b'')
  29. # 审查配置信息
  30. self.review_config = file_info.get('review_config', [])
  31. self.project_plan_type = file_info.get('project_plan_type', '')
  32. self.tendency_review_role = file_info.get('tendency_review_role', '')
  33. self.test_designation_chunk_flag = file_info.get('test_designation_chunk_flag', '')
  34. # 时间戳信息
  35. self.launched_at = file_info.get('launched_at', 0)
  36. # 保存完整的原始数据(只读访问)
  37. self._file_info = file_info.copy() # 防止意外修改原始数据
  38. logger.debug(f"创建TaskFileInfo: {self.file_id} for user {self.user_id}")
  39. def get_file_info(self) -> dict:
  40. """获取完整的原始文件信息(只读副本)"""
  41. return self._file_info.copy()
  42. def get_review_config_list(self) -> list:
  43. """获取审查配置列表"""
  44. return self.review_config.copy()
  45. def get_project_plan_type(self) -> str:
  46. """获取工程方案类型"""
  47. return self.project_plan_type
  48. def get_tendency_review_role(self) -> str:
  49. """获取倾向性审查角色"""
  50. return self.tendency_review_role
  51. def get_test_designation_chunk_flag(self) -> str:
  52. """获取测试定位标志符"""
  53. return self.test_designation_chunk_flag
  54. def has_review_type(self, review_type: str) -> bool:
  55. """检查是否包含指定的审查类型"""
  56. return review_type in self.review_config
  57. def get_file_size(self) -> int:
  58. """获取文件大小(字节)"""
  59. return len(self.file_content) if self.file_content else 0
  60. def __str__(self) -> str:
  61. """字符串表示"""
  62. return f"TaskFileInfo(file_id={self.file_id}, user_id={self.user_id})"
  63. def __repr__(self) -> str:
  64. """调试用的字符串表示"""
  65. return f"TaskFileInfo(file_id='{self.file_id}', user_id='{self.user_id}', task_id='{self.callback_task_id}')"
  66. class TaskChain:
  67. """任务链 - 工作流执行状态跟踪,通过引用TaskFileInfo避免数据重复"""
  68. def __init__(self, task_file_info: TaskFileInfo):
  69. """
  70. 初始化任务链,引用 TaskFileInfo 避免数据重复
  71. Args:
  72. task_file_info: 任务文件信息对象
  73. """
  74. # 引用任务文件信息(避免重复存储)
  75. self.task_info = task_file_info
  76. # 执行状态信息(创建时不预设状态)
  77. self.status: str = "pending" # pending, processing, completed, failed
  78. self.current_stage: str = "created"
  79. self.created_at = datetime.now()
  80. self.started_at: Optional[datetime] = None
  81. self.completed_at: Optional[datetime] = None
  82. self.results: Dict = {}
  83. logger.info(f"创建任务链: {self.file_id} for user {self.user_id}")
  84. # 属性访问代理,保持向后兼容
  85. @property
  86. def callback_task_id(self) -> str:
  87. """获取回调任务ID"""
  88. return self.task_info.callback_task_id
  89. @property
  90. def file_id(self) -> str:
  91. """获取文件ID"""
  92. return self.task_info.file_id
  93. @property
  94. def user_id(self) -> str:
  95. """获取用户ID"""
  96. return self.task_info.user_id
  97. # 便捷方法
  98. def start_processing(self):
  99. """标记任务开始处理"""
  100. self.started_at = datetime.now()
  101. self.status = "processing"
  102. self.current_stage = "document_processing" # 开始处理时设置初始阶段
  103. logger.info(f"任务开始处理: {self.file_id}")
  104. def complete_processing(self):
  105. """标记任务处理完成"""
  106. self.completed_at = datetime.now()
  107. self.status = "completed"
  108. logger.info(f"任务处理完成: {self.file_id}")
  109. def fail_processing(self, error_message: str = ""):
  110. """标记任务处理失败"""
  111. self.completed_at = datetime.now()
  112. self.status = "failed"
  113. logger.error(f"任务处理失败: {self.file_id}, 错误: {error_message}")
  114. def update_stage(self, stage: str):
  115. """更新当前处理阶段"""
  116. self.current_stage = stage
  117. logger.debug(f"任务阶段更新: {self.file_id} -> {stage}")
  118. def __str__(self) -> str:
  119. """字符串表示"""
  120. return f"TaskChain(file_id={self.file_id}, status={self.status}, stage={self.current_stage})"
  121. def __repr__(self) -> str:
  122. """调试用的字符串表示"""
  123. return f"TaskChain(file_id='{self.file_id}', status='{self.status}', stage='{self.current_stage}', created='{self.created_at}')"