| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- """
- 任务进度管理器
- 负责任务进度的存储、更新和查询
- """
- import json
- from typing import Dict, Any, Optional
- from datetime import datetime
- from foundation.logger.loggering import server_logger as logger
- class ProgressManager:
- """任务进度管理器"""
- def __init__(self):
- self.progress_data = {} # 简化:使用内存存储
- async def initialize_progress(self, callback_task_id: str, user_id: str, stages: list):
- """初始化进度记录"""
- try:
- self.progress_data[callback_task_id] = {
- "user_id": user_id,
- "overall_progress": 0,
- "current_stage": stages[0]["stage_name"] if stages else "",
- "stages": stages,
- "updated_at": datetime.now()
- }
- logger.info(f"初始化任务进度: {callback_task_id}")
- except Exception as e:
- logger.error(f"初始化进度失败: {str(e)}")
- raise
- async def update_stage_progress(self, callback_task_id: str, stage_name: str,
- progress: int, status: str, message: str = "",
- sub_progress: int = 0):
- """更新阶段进度"""
- try:
- if callback_task_id not in self.progress_data:
- logger.warning(f"任务进度不存在: {callback_task_id}")
- return
- task_progress = self.progress_data[callback_task_id]
- # 更新阶段进度
- for stage in task_progress["stages"]:
- if stage["stage_name"] == stage_name:
- stage["progress"] = progress
- stage["stage_status"] = status
- stage["message"] = message
- stage["sub_progress"] = sub_progress
- break
- # 更新当前阶段和整体进度
- task_progress["current_stage"] = stage_name
- task_progress["overall_progress"] = self._calculate_overall_progress(task_progress["stages"])
- task_progress["updated_at"] = datetime.now()
- logger.debug(f"更新进度: {callback_task_id}, 阶段: {stage_name}, 进度: {progress}%")
- except Exception as e:
- logger.error(f"更新阶段进度失败: {str(e)}")
- raise
- async def get_progress(self, callback_task_id: str) -> Optional[Dict[str, Any]]:
- """获取任务进度"""
- try:
- if callback_task_id not in self.progress_data:
- return None
- task_progress = self.progress_data[callback_task_id]
- # 计算整体状态
- if any(stage["stage_status"] == "failed" for stage in task_progress["stages"]):
- review_task_status = "failed"
- elif all(stage["stage_status"] == "completed" for stage in task_progress["stages"]):
- review_task_status = "completed"
- elif any(stage["stage_status"] == "processing" for stage in task_progress["stages"]):
- review_task_status = "processing"
- else:
- review_task_status = "pending"
- return {
- "callback_task_id": callback_task_id,
- "user_id": task_progress["user_id"],
- "review_task_status": review_task_status,
- "overall_progress": task_progress["overall_progress"],
- "stages": task_progress["stages"],
- "updated_at": int(task_progress["updated_at"].timestamp()),
- "estimated_remaining": 600
- }
- except Exception as e:
- logger.error(f"获取进度失败: {str(e)}")
- return None
- async def complete_task(self, callback_task_id: str, result: Dict[str, Any]):
- """标记任务完成"""
- try:
- if callback_task_id in self.progress_data:
- task_progress = self.progress_data[callback_task_id]
- # 完成最后一个阶段
- if task_progress["stages"]:
- task_progress["stages"][-1]["stage_status"] = "completed"
- task_progress["stages"][-1]["progress"] = 100
- task_progress["overall_progress"] = 100
- task_progress["updated_at"] = datetime.now()
- # 保存结果
- task_progress["result"] = result
- logger.info(f"任务完成: {callback_task_id}")
- except Exception as e:
- logger.error(f"标记任务完成失败: {str(e)}")
- raise
- def _calculate_overall_progress(self, stages: list) -> int:
- """计算整体进度"""
- if not stages:
- return 0
- total_progress = sum(stage["progress"] for stage in stages)
- return int(total_progress / len(stages))
|