|
|
@@ -433,16 +433,34 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
|
|
|
yield format_sse_event("error", error_data)
|
|
|
|
|
|
finally:
|
|
|
- # 清理回调连接(确保资源被正确释放)
|
|
|
+ # 延迟清理,给后台任务时间完成最后的回调
|
|
|
try:
|
|
|
- sse_callback_manager.unregister_callback(callback_task_id)
|
|
|
- except Exception as cleanup_error:
|
|
|
- logger.warning(f"清理回调连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
|
|
|
+ # 等待可能的最后回调(3秒)
|
|
|
+ import asyncio
|
|
|
+ await asyncio.sleep(3)
|
|
|
|
|
|
- try:
|
|
|
- await unified_sse_manager.close_connection(callback_task_id)
|
|
|
+ # 检查任务状态后再决定是否清理
|
|
|
+ # 使用文件顶部已导入的 progress_manager 实例
|
|
|
+ try:
|
|
|
+ task_progress = await progress_manager.get_progress(callback_task_id)
|
|
|
+
|
|
|
+ # 如果任务状态不存在,说明任务已完成且状态已被清理
|
|
|
+ if task_progress is None:
|
|
|
+ logger.info(f"任务已完成且状态已清理,清理SSE回调: {callback_task_id}")
|
|
|
+ sse_callback_manager.unregister_callback(callback_task_id)
|
|
|
+ elif task_progress.get("overall_task_status") in ["completed", "failed"]:
|
|
|
+ # 任务明确标记为完成或失败
|
|
|
+ logger.info(f"任务已完成,清理SSE回调: {callback_task_id}")
|
|
|
+ sse_callback_manager.unregister_callback(callback_task_id)
|
|
|
+ else:
|
|
|
+ # 任务仍在进行中
|
|
|
+ logger.warning(f"任务可能仍在进行中,保留SSE连接: {callback_task_id}")
|
|
|
+ except Exception as status_check_error:
|
|
|
+ logger.warning(f"检查任务状态失败,强制清理: {callback_task_id}, 错误: {str(status_check_error)}")
|
|
|
+ # 如果无法检查状态,还是进行清理以避免资源泄露
|
|
|
+ sse_callback_manager.unregister_callback(callback_task_id)
|
|
|
except Exception as cleanup_error:
|
|
|
- logger.warning(f"断开SSE连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
|
|
|
+ logger.warning(f"延迟清理SSE连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
|
|
|
|
|
|
logger.debug(f"启动审查SSE流已结束: {callback_task_id}")
|
|
|
|