""" 任务进度管理器 负责任务进度的存储、更新和查询 """ import json from typing import Dict, Any, Optional from datetime import datetime from foundation.logger.loggering import server_logger as logger class ProgressManager: """任务进度管理器""" def __init__(self): self.progress_data = {} # 简化:使用内存存储 async def initialize_progress(self, callback_task_id: str, user_id: str, stages: list): """初始化进度记录""" try: self.progress_data[callback_task_id] = { "user_id": user_id, "overall_progress": 0, "current_stage": stages[0]["stage_name"] if stages else "", "stages": stages, "updated_at": datetime.now() } logger.info(f"初始化任务进度: {callback_task_id}") except Exception as e: logger.error(f"初始化进度失败: {str(e)}") raise async def update_stage_progress(self, callback_task_id: str, stage_name: str, progress: int, status: str, message: str = "", sub_progress: int = 0): """更新阶段进度""" try: if callback_task_id not in self.progress_data: logger.warning(f"任务进度不存在: {callback_task_id}") return task_progress = self.progress_data[callback_task_id] # 更新阶段进度 for stage in task_progress["stages"]: if stage["stage_name"] == stage_name: stage["progress"] = progress stage["stage_status"] = status stage["message"] = message stage["sub_progress"] = sub_progress break # 更新当前阶段和整体进度 task_progress["current_stage"] = stage_name task_progress["overall_progress"] = self._calculate_overall_progress(task_progress["stages"]) task_progress["updated_at"] = datetime.now() logger.debug(f"更新进度: {callback_task_id}, 阶段: {stage_name}, 进度: {progress}%") except Exception as e: logger.error(f"更新阶段进度失败: {str(e)}") raise async def get_progress(self, callback_task_id: str) -> Optional[Dict[str, Any]]: """获取任务进度""" try: if callback_task_id not in self.progress_data: return None task_progress = self.progress_data[callback_task_id] # 计算整体状态 if any(stage["stage_status"] == "failed" for stage in task_progress["stages"]): review_task_status = "failed" elif all(stage["stage_status"] == "completed" for stage in task_progress["stages"]): review_task_status = "completed" elif any(stage["stage_status"] == "processing" for stage in task_progress["stages"]): review_task_status = "processing" else: review_task_status = "pending" return { "callback_task_id": callback_task_id, "user_id": task_progress["user_id"], "review_task_status": review_task_status, "overall_progress": task_progress["overall_progress"], "stages": task_progress["stages"], "updated_at": int(task_progress["updated_at"].timestamp()), "estimated_remaining": 600 } except Exception as e: logger.error(f"获取进度失败: {str(e)}") return None async def complete_task(self, callback_task_id: str, result: Dict[str, Any]): """标记任务完成""" try: if callback_task_id in self.progress_data: task_progress = self.progress_data[callback_task_id] # 完成最后一个阶段 if task_progress["stages"]: task_progress["stages"][-1]["stage_status"] = "completed" task_progress["stages"][-1]["progress"] = 100 task_progress["overall_progress"] = 100 task_progress["updated_at"] = datetime.now() # 保存结果 task_progress["result"] = result logger.info(f"任务完成: {callback_task_id}") except Exception as e: logger.error(f"标记任务完成失败: {str(e)}") raise def _calculate_overall_progress(self, stages: list) -> int: """计算整体进度""" if not stages: return 0 total_progress = sum(stage["progress"] for stage in stages) return int(total_progress / len(stages))