progress_manager.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """
  2. 任务进度管理器
  3. 负责任务进度的存储、更新和查询
  4. """
  5. import json
  6. from typing import Dict, Any, Optional
  7. from datetime import datetime
  8. from foundation.logger.loggering import server_logger as logger
  9. class ProgressManager:
  10. """任务进度管理器"""
  11. def __init__(self):
  12. self.progress_data = {} # 简化:使用内存存储
  13. async def initialize_progress(self, callback_task_id: str, user_id: str, stages: list):
  14. """初始化进度记录"""
  15. try:
  16. self.progress_data[callback_task_id] = {
  17. "user_id": user_id,
  18. "overall_progress": 0,
  19. "current_stage": stages[0]["stage_name"] if stages else "",
  20. "stages": stages,
  21. "updated_at": datetime.now()
  22. }
  23. logger.info(f"初始化任务进度: {callback_task_id}")
  24. except Exception as e:
  25. logger.error(f"初始化进度失败: {str(e)}")
  26. raise
  27. async def update_stage_progress(self, callback_task_id: str, stage_name: str,
  28. progress: int, status: str, message: str = "",
  29. sub_progress: int = 0):
  30. """更新阶段进度"""
  31. try:
  32. if callback_task_id not in self.progress_data:
  33. logger.warning(f"任务进度不存在: {callback_task_id}")
  34. return
  35. task_progress = self.progress_data[callback_task_id]
  36. # 更新阶段进度
  37. for stage in task_progress["stages"]:
  38. if stage["stage_name"] == stage_name:
  39. stage["progress"] = progress
  40. stage["stage_status"] = status
  41. stage["message"] = message
  42. stage["sub_progress"] = sub_progress
  43. break
  44. # 更新当前阶段和整体进度
  45. task_progress["current_stage"] = stage_name
  46. task_progress["overall_progress"] = self._calculate_overall_progress(task_progress["stages"])
  47. task_progress["updated_at"] = datetime.now()
  48. logger.debug(f"更新进度: {callback_task_id}, 阶段: {stage_name}, 进度: {progress}%")
  49. except Exception as e:
  50. logger.error(f"更新阶段进度失败: {str(e)}")
  51. raise
  52. async def get_progress(self, callback_task_id: str) -> Optional[Dict[str, Any]]:
  53. """获取任务进度"""
  54. try:
  55. if callback_task_id not in self.progress_data:
  56. return None
  57. task_progress = self.progress_data[callback_task_id]
  58. # 计算整体状态
  59. if any(stage["stage_status"] == "failed" for stage in task_progress["stages"]):
  60. review_task_status = "failed"
  61. elif all(stage["stage_status"] == "completed" for stage in task_progress["stages"]):
  62. review_task_status = "completed"
  63. elif any(stage["stage_status"] == "processing" for stage in task_progress["stages"]):
  64. review_task_status = "processing"
  65. else:
  66. review_task_status = "pending"
  67. return {
  68. "callback_task_id": callback_task_id,
  69. "user_id": task_progress["user_id"],
  70. "review_task_status": review_task_status,
  71. "overall_progress": task_progress["overall_progress"],
  72. "stages": task_progress["stages"],
  73. "updated_at": int(task_progress["updated_at"].timestamp()),
  74. "estimated_remaining": 600
  75. }
  76. except Exception as e:
  77. logger.error(f"获取进度失败: {str(e)}")
  78. return None
  79. async def complete_task(self, callback_task_id: str, result: Dict[str, Any]):
  80. """标记任务完成"""
  81. try:
  82. if callback_task_id in self.progress_data:
  83. task_progress = self.progress_data[callback_task_id]
  84. # 完成最后一个阶段
  85. if task_progress["stages"]:
  86. task_progress["stages"][-1]["stage_status"] = "completed"
  87. task_progress["stages"][-1]["progress"] = 100
  88. task_progress["overall_progress"] = 100
  89. task_progress["updated_at"] = datetime.now()
  90. # 保存结果
  91. task_progress["result"] = result
  92. logger.info(f"任务完成: {callback_task_id}")
  93. except Exception as e:
  94. logger.error(f"标记任务完成失败: {str(e)}")
  95. raise
  96. def _calculate_overall_progress(self, stages: list) -> int:
  97. """计算整体进度"""
  98. if not stages:
  99. return 0
  100. total_progress = sum(stage["progress"] for stage in stages)
  101. return int(total_progress / len(stages))