ソースを参照

v0.0.3-异常关闭逻辑修复
- Lq-异构GPU-Qwen3 8B模型接入
- gemini-模型接入优化

WangXuMing 2 ヶ月 前
コミット
0fd1218699

BIN
build_graph_app.png


+ 8 - 8
config/config.ini

@@ -1,14 +1,14 @@
 
 
 
 
 [model]
 [model]
-MODEL_TYPE=gemini-2.0-flash
+MODEL_TYPE=gemini
 
 
 
 
 
 
 [gemini]
 [gemini]
-GEMINI_SERVER_URL=https://generativelanguage.googleapis.com
+GEMINI_SERVER_URL=https://generativelanguage.googleapis.com/v1beta/openai/
 GEMINI_MODEL_ID=gemini-2.0-flash
 GEMINI_MODEL_ID=gemini-2.0-flash
-GEMINI_API_KEY=AIzaSyDcL1AZS4u9N-8OyE7q7M25wvYZhj2okJc
+GEMINI_API_KEY=AIzaSyBwcjYoxci4QM1mqIaVcbIf_zmsrN9yuWE
 
 
 [deepseek]
 [deepseek]
 DEEPSEEK_SERVER_URL=https://api.deepseek.com
 DEEPSEEK_SERVER_URL=https://api.deepseek.com
@@ -29,7 +29,7 @@ QWEN_API_KEY=ms-9ad4a379-d592-4acd-b92c-8bac08a4a045
 
 
 [ai_review]
 [ai_review]
 # 调试模式配置
 # 调试模式配置
-MAX_REVIEW_UNITS=1
+MAX_REVIEW_UNITS=10
 REVIEW_MODE=random
 REVIEW_MODE=random
 # REVIEW_MODE=all/random/first
 # REVIEW_MODE=all/random/first
 
 
@@ -66,10 +66,10 @@ SLCF_EMBED_MODEL_ID=netease-youdao/bce-embedding-base_v1
 SLCF_REANKER_MODEL_ID=BAAI/bge-reranker-v2-m3
 SLCF_REANKER_MODEL_ID=BAAI/bge-reranker-v2-m3
 SLCF_VL_CHAT_MODEL_ID=THUDM/GLM-4.1V-9B-Thinking
 SLCF_VL_CHAT_MODEL_ID=THUDM/GLM-4.1V-9B-Thinking
 
 
-[qwen_local_1.5b]
-QWEN_LOCAL_1_5B_SERVER_URL=http://172.16.35.50:8000/v1
-QWEN_LOCAL_1_5B_MODEL_ID=Qwen2.5-1.5B-Instruct
-QWEN_LOCAL_1_5B_API_KEY=sk-dummy
+[lq_qwen3_8b]
+QWEN_LOCAL_1_5B_SERVER_URL=http://192.168.91.253:9000/v1
+QWEN_LOCAL_1_5B_MODEL_ID=/mnt/Qwen3-8B
+QWEN_LOCAL_1_5B_API_KEY=dummy
 
 
 [qwen_local_14b]
 [qwen_local_14b]
 QWEN_LOCAL_14B_SERVER_URL=http://172.16.35.50:8003/v1
 QWEN_LOCAL_14B_SERVER_URL=http://172.16.35.50:8003/v1

+ 24 - 60
core/base/progress_manager.py

@@ -5,59 +5,46 @@ from datetime import datetime
 
 
 from foundation.logger.loggering import server_logger as logger
 from foundation.logger.loggering import server_logger as logger
 from foundation.base.config import config_handler
 from foundation.base.config import config_handler
+from core.base.sse_manager import unified_sse_manager
+
 
 
 class SSECallbackManager:
 class SSECallbackManager:
-    """SSE回调管理器 - 单例模式"""
-    _instance = None
-    _callbacks = {}
+    """
+    SSE回调管理器 - 兼容性包装器,委托给统一SSE管理器
 
 
-    def __new__(cls):
-        if cls._instance is None:
-            cls._instance = super().__new__(cls)
-        return cls._instance
+    注意: 此类保持向后兼容,建议直接使用 unified_sse_manager
+    """
 
 
     def register_callback(self, callback_task_id: str, callback_func):
     def register_callback(self, callback_task_id: str, callback_func):
-        self._callbacks[callback_task_id] = callback_func
-        logger.info(f"SSE回调注册, 当前注册数: {len(self._callbacks)}")
+        """注册回调函数"""
+        unified_sse_manager.register_callback_only(callback_task_id, callback_func)
 
 
     def unregister_callback(self, callback_task_id: str):
     def unregister_callback(self, callback_task_id: str):
-        if callback_task_id in self._callbacks:
-            del self._callbacks[callback_task_id]
-            logger.info(f"SSE回调注销, 剩余注册数: {len(self._callbacks)}")
+        """注销回调函数"""
+        unified_sse_manager.unregister_callback_only(callback_task_id)
 
 
     def is_callback_registered(self, callback_task_id: str) -> bool:
     def is_callback_registered(self, callback_task_id: str) -> bool:
         """检查回调是否已注册"""
         """检查回调是否已注册"""
-        return callback_task_id in self._callbacks
+        return unified_sse_manager.is_callback_registered(callback_task_id)
 
 
     async def trigger_callback(self, callback_task_id: str, current_data: dict):
     async def trigger_callback(self, callback_task_id: str, current_data: dict):
-        if callback_task_id in self._callbacks:
-            try:
-                await self._callbacks[callback_task_id](callback_task_id, current_data)
-                logger.debug(f"SSE回调执行成功: {callback_task_id}")
-                logger.debug(f"SSE回调已触发: {callback_task_id}, 当前注册回调数: {len(self._callbacks)}")
-                return True
-            except Exception as e:
-                logger.error(f"SSE回调执行失败: {callback_task_id}, {e}")
-                return False
-        else:
-            logger.debug(f"未找到SSE回调: {callback_task_id}, 当前注册回调数: {len(self._callbacks)}, 已注册ID: {list(self._callbacks.keys())}")
-            return False
+        """触发回调函数"""
+        return await unified_sse_manager.trigger_callback(callback_task_id, current_data)
 
 
     def get_callbacks_count(self):
     def get_callbacks_count(self):
-        return len(self._callbacks)
+        """获取回调数量"""
+        return unified_sse_manager.get_callback_count()
 
 
     def clear_all_callbacks(self):
     def clear_all_callbacks(self):
-        self._callbacks.clear()
-        logger.info("已清空所有SSE回调")
+        """清空所有回调"""
+        asyncio.create_task(unified_sse_manager.clear_all())
 
 
-    def force_close_sse(self, callback_task_id: str):
-        """强制关闭SSE连接"""
-        if callback_task_id in self._callbacks:
-            del self._callbacks[callback_task_id]
-            logger.info(f"强制关闭SSE连接: {callback_task_id}")
-        else:
-            logger.warning(f"SSE连接已不存在,无需关闭: {callback_task_id}")
+    async def force_close_sse(self, callback_task_id: str):
+        """强制关闭SSE连接 - 同步调用close_connection(兼容性方法)"""
+        await unified_sse_manager.close_connection(callback_task_id)
 
 
+
+# 创建兼容性实例
 sse_callback_manager = SSECallbackManager()
 sse_callback_manager = SSECallbackManager()
 
 
 class ProgressManager:
 class ProgressManager:
@@ -301,34 +288,11 @@ class ProgressManager:
             return None
             return None
 
 
     async def complete_task(self, callback_task_id: str, user_id: str = None, current_data: dict = None):
     async def complete_task(self, callback_task_id: str, user_id: str = None, current_data: dict = None):
-        """标记任务完成"""
+        """标记任务完成 - 使用单一同步强制关闭逻辑"""
 
 
         try:
         try:
-            logger.info(f"保存审查结果: {callback_task_id}")
-
-            # 使用update_stage_progress方法更新响应数据,但不推送SSE
-            await self.update_stage_progress(
-                callback_task_id=callback_task_id,
-                user_id=user_id,
-                current=current_data.get("current", 100) if current_data else 100,
-                stage_name="审查完成",
-                status="completed",
-                message="施工审查方案处理完成!",
-                overall_task_status='completed',
-                issues=current_data.get("issues", []) if current_data else []
-            )
-
             logger.info(f"取消注册任务: {callback_task_id}")
             logger.info(f"取消注册任务: {callback_task_id}")
-            # 先取消注册,再强制关闭,确保彻底清理
-            sse_callback_manager.unregister_callback(callback_task_id)
-
-            # 强制关闭SSE连接(防止残留)
-            sse_callback_manager.force_close_sse(callback_task_id)
-
-            logger.info(f"SSE连接已彻底关闭: {callback_task_id}")
-
-
-
+            await unified_sse_manager.close_connection(callback_task_id)
             logger.info(f"任务关闭: {callback_task_id}")
             logger.info(f"任务关闭: {callback_task_id}")
         except Exception as e:
         except Exception as e:
             logger.error(f"标记任务完成失败: {str(e)}")
             logger.error(f"标记任务完成失败: {str(e)}")

+ 274 - 0
core/base/sse_manager.py

@@ -0,0 +1,274 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+'''
+@Project   : lq-agent-api
+@File      : sse_manager.py
+@IDE       : VsCode
+@Author    :
+@Date      : 2025-12-04 10:58:00
+
+=================================
+
+📋 统一SSE管理器 (Unified SSE Manager)
+
+🏗️ 核心功能:
+├── UnifiedSSEManager()            # 统一SSE管理器(单例)
+├── establish_connection()         # 建立连接并注册回调
+├── close_connection()             # 关闭连接(清理连接和回调)
+├── send_progress()                # 发送进度消息
+└── trigger_callback()             # 触发回调函数
+
+📊 状态管理:
+├── connections                    # 消息队列字典
+├── callbacks                      # 回调函数字典
+└── get_connection_count()         # 获取连接数统计
+
+🔧 实用方法:
+├── is_connected()                 # 检查连接是否存在
+├── is_callback_registered()       # 检查回调是否已注册
+├── get_stats()                    # 获取详细统计信息
+└── clear_all()                    # 清理所有连接和回调
+'''
+
+import asyncio
+from typing import Dict, Any, Optional, Callable
+from datetime import datetime
+
+from foundation.logger.loggering import server_logger as logger
+
+
+class UnifiedSSEManager:
+    """
+    统一的SSE管理器 - 管理SSE连接、回调函数和消息推送
+
+    功能:
+    1. 管理SSE消息队列连接
+    2. 管理回调函数注册和触发
+    3. 提供统一的消息推送接口
+    4. 确保连接和回调状态同步
+    """
+
+    _instance = None
+
+    def __new__(cls):
+        """单例模式实现"""
+        if cls._instance is None:
+            cls._instance = super().__new__(cls)
+            cls._instance.connections = {}  # 消息队列字典
+            cls._instance.callbacks = {}    # 回调函数字典
+        return cls._instance
+
+    def __init__(self):
+        """初始化统一SSE管理器"""
+        pass  # 在__new__中已完成初始化
+
+    async def establish_connection(self, callback_task_id: str, callback_func: Optional[Callable] = None):
+        """
+        建立SSE连接并注册回调函数
+
+        Args:
+            callback_task_id: 回调任务ID
+            callback_func: 可选的回调函数
+
+        Returns:
+            asyncio.Queue: 消息队列,用于SSE事件流
+        """
+        try:
+            # 创建消息队列
+            queue = asyncio.Queue()
+            self.connections[callback_task_id] = queue
+
+            # 注册回调函数(如果提供)
+            if callback_func:
+                self.callbacks[callback_task_id] = callback_func
+
+            # 发送连接建立确认消息
+            await queue.put({
+                "type": "connection_established",
+                "callback_task_id": callback_task_id,
+                "timestamp": datetime.now().isoformat()
+            })
+
+            logger.info(f"SSE连接已建立: {callback_task_id}")
+            logger.info(f"当前连接数: {len(self.connections)}, 回调数: {len(self.callbacks)}")
+
+            return queue
+
+        except Exception as e:
+            logger.error(f"建立SSE连接失败: {callback_task_id}, 错误: {str(e)}")
+            raise
+
+    async def close_connection(self, callback_task_id: str):
+        """
+        关闭SSE连接(同时清理连接和回调)
+
+        Args:
+            callback_task_id: 回调任务ID
+        """
+        try:
+            connection_existed = False
+            callback_existed = False
+
+            # 1. 先向队列发送结束信号,让SSE流能够正常结束
+            if callback_task_id in self.connections:
+                queue = self.connections[callback_task_id]
+                try:
+                    await queue.put({
+                        "type": "connection_closed",
+                        "callback_task_id": callback_task_id,
+                        "timestamp": datetime.now().isoformat()
+                    })
+                    logger.info(f"已发送连接关闭信号到队列: {callback_task_id}")
+                except Exception as queue_error:
+                    logger.warning(f"发送关闭信号失败,队列可能已关闭: {callback_task_id}, 错误: {str(queue_error)}")
+
+            # 2. 清理连接
+            if callback_task_id in self.connections:
+                del self.connections[callback_task_id]
+                connection_existed = True
+                logger.info(f"SSE连接已断开: {callback_task_id}")
+
+            # 3. 清理回调
+            if callback_task_id in self.callbacks:
+                del self.callbacks[callback_task_id]
+                callback_existed = True
+                logger.info(f"SSE回调已注销: {callback_task_id}")
+
+            if not connection_existed and not callback_existed:
+                logger.debug(f"SSE连接和回调均不存在: {callback_task_id}")
+            else:
+                logger.info(f"SSE连接清理完成: {callback_task_id}, 剩余连接数: {len(self.connections)}, 剩余回调数: {len(self.callbacks)}")
+
+        except Exception as e:
+            logger.error(f"关闭SSE连接时出错: {callback_task_id}, 错误: {str(e)}")
+
+    async def send_progress(self, callback_task_id: str, current_data: dict):
+        """
+        发送进度消息到指定连接
+
+        Args:
+            callback_task_id: 回调任务ID
+            current_data: 进度数据
+        """
+        try:
+            queue = self.connections.get(callback_task_id)
+            if queue:
+                # 确定事件类型
+                event_type = current_data.get("event_type", "processing")
+
+                # 处理特殊的单元审查事件
+                if event_type == "unit_review" or (event_type == "processing" and current_data.get("status") == "unit_review_update"):
+                    event_type = "unit_review_update"
+
+                # 添加时间戳
+                message = {
+                    "type": event_type,
+                    "data": current_data,
+                    "timestamp": datetime.now().isoformat()
+                }
+
+                await queue.put(message)
+                logger.debug(f"SSE进度已推送: {callback_task_id}, 事件类型: {event_type}")
+            else:
+                logger.warning(f"SSE连接不存在,跳过进度推送: {callback_task_id} - 任务继续执行")
+
+        except Exception as e:
+            logger.error(f"发送SSE进度消息失败: {callback_task_id}, 错误: {str(e)}")
+
+    async def trigger_callback(self, callback_task_id: str, current_data: dict):
+        """
+        触发指定任务的回调函数
+
+        Args:
+            callback_task_id: 回调任务ID
+            current_data: 传递给回调的数据
+
+        Returns:
+            bool: 回调是否成功触发
+        """
+        try:
+            callback_func = self.callbacks.get(callback_task_id)
+            if callback_func:
+                await callback_func(callback_task_id, current_data)
+                logger.debug(f"SSE回调执行成功: {callback_task_id}")
+                return True
+            else:
+                logger.debug(f"未找到SSE回调: {callback_task_id}, 已注册ID: {list(self.callbacks.keys())}")
+                return False
+
+        except Exception as e:
+            logger.error(f"SSE回调执行失败: {callback_task_id}, 错误: {str(e)}")
+            return False
+
+    def is_connected(self, callback_task_id: str) -> bool:
+        """检查SSE连接是否存在"""
+        return callback_task_id in self.connections
+
+    def is_callback_registered(self, callback_task_id: str) -> bool:
+        """检查回调函数是否已注册"""
+        return callback_task_id in self.callbacks
+
+    def get_connection_count(self) -> int:
+        """获取当前连接数"""
+        return len(self.connections)
+
+    def get_callback_count(self) -> int:
+        """获取当前回调数"""
+        return len(self.callbacks)
+
+    def get_stats(self) -> Dict[str, Any]:
+        """获取详细的统计信息"""
+        return {
+            "connections": {
+                "count": len(self.connections),
+                "ids": list(self.connections.keys())
+            },
+            "callbacks": {
+                "count": len(self.callbacks),
+                "ids": list(self.callbacks.keys())
+            },
+            "synchronized": len(self.connections) == len(self.callbacks)
+        }
+
+    async def clear_all(self):
+        """清理所有连接和回调"""
+        try:
+            connection_count = len(self.connections)
+            callback_count = len(self.callbacks)
+
+            self.connections.clear()
+            self.callbacks.clear()
+
+            logger.info(f"已清理所有SSE连接和回调: {connection_count}个连接, {callback_count}个回调")
+
+        except Exception as e:
+            logger.error(f"清理所有SSE连接和回调时出错: {str(e)}")
+
+    def register_callback_only(self, callback_task_id: str, callback_func: Callable):
+        """
+        仅注册回调函数(不建立连接)
+
+        Args:
+            callback_task_id: 回调任务ID
+            callback_func: 回调函数
+        """
+        self.callbacks[callback_task_id] = callback_func
+        logger.info(f"SSE回调已注册: {callback_task_id}, 当前回调数: {len(self.callbacks)}")
+
+    def unregister_callback_only(self, callback_task_id: str):
+        """
+        仅注销回调函数(不关闭连接)
+
+        Args:
+            callback_task_id: 回调任务ID
+        """
+        if callback_task_id in self.callbacks:
+            del self.callbacks[callback_task_id]
+            logger.info(f"SSE回调已注销: {callback_task_id}, 剩余回调数: {len(self.callbacks)}")
+        else:
+            logger.debug(f"SSE回调不存在: {callback_task_id}")
+
+
+# 创建全局单例实例
+unified_sse_manager = UnifiedSSEManager()

+ 2 - 2
core/construction_review/component/doc_worker/config/config.yaml

@@ -18,9 +18,9 @@ text_splitting:
   # 目标层级(默认按几级目录分类)
   # 目标层级(默认按几级目录分类)
   target_level: 1
   target_level: 1
   # 最大分块字符数
   # 最大分块字符数
-  max_chunk_size: 1100
+  max_chunk_size: 1500
   # 最小分块字符数
   # 最小分块字符数
-  min_chunk_size: 20
+  min_chunk_size: 800
   # 模糊匹配阈值(0-1)
   # 模糊匹配阈值(0-1)
   fuzzy_threshold: 0.80
   fuzzy_threshold: 0.80
 
 

+ 36 - 12
core/construction_review/workflows/ai_review_workflow.py

@@ -48,6 +48,7 @@ from langgraph.graph import StateGraph, END
 from langgraph.graph.message import add_messages
 from langgraph.graph.message import add_messages
 from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
 from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
 from foundation.logger.loggering import server_logger as logger
 from foundation.logger.loggering import server_logger as logger
+from foundation.base.redis_connection import RedisConnectionFactory
 from ..component import AIReviewEngine
 from ..component import AIReviewEngine
 
 
 # 常量定义
 # 常量定义
@@ -438,18 +439,12 @@ class AIReviewCoreFun:
                             result.technical_compliance
                             result.technical_compliance
                         )
                         )
 
 
-                        # # 正确统计:只统计真正存在的问题数量
-                        # issues_count = sum(
-                        #     1 for issue in issues
-                        #     for issue_data in issue.values()
-                        #     for review_item in issue_data.get("review_lists", [])
-                        #     if review_item.get("exist_issue", False)
-                        # )
                         current = int(((unit_index + 1) / total_units) * 100)
                         current = int(((unit_index + 1) / total_units) * 100)
 
 
                         # 立即发送单元审查详情(包含unit_review和processing_flag事件)
                         # 立即发送单元审查详情(包含unit_review和processing_flag事件)
                         await self._send_unit_review_progress(state, unit_index, total_units, section_label, issues, current)
                         await self._send_unit_review_progress(state, unit_index, total_units, section_label, issues, current)
-
+                    else:
+                        logger.error(f"执行单个单元审查失败: {str(result.error_message)}")
                     return result
                     return result
 
 
             # 创建并发任务
             # 创建并发任务
@@ -659,7 +654,7 @@ class AIReviewCoreFun:
                                            total_units: int, section_label: str,
                                            total_units: int, section_label: str,
                                            issues_count: int) -> None:
                                            issues_count: int) -> None:
         """
         """
-        发送单元完成进度更新
+        发送单元完成进度更新 - 基于Redis分布式计数(多任务安全)
 
 
         Args:
         Args:
             state: AI审查状态
             state: AI审查状态
@@ -669,13 +664,42 @@ class AIReviewCoreFun:
             issues_count: 问题数量
             issues_count: 问题数量
         """
         """
         try:
         try:
-            current = int(((unit_index + 1) / total_units) * 100)
+            # 获取任务ID
+            task_id = state.get("callback_task_id", "")
+
+            # 尝试使用Redis进行分布式计数
+            redis_client = None
+            try:
+                redis_client = await RedisConnectionFactory.get_connection()
+            except Exception as e:
+                logger.warning(f"Redis连接失败,使用降级方案: {str(e)}")
+
+            if redis_client and task_id:
+                # 使用Redis分布式计数,避免多任务间的变量混淆
+                completed_key = f"ai_review:overall_task_progress:{task_id}:completed"
+
+                # 原子操作:添加单元索引到Redis集合(自动去重)
+                await redis_client.sadd(completed_key, str(unit_index))
+                await redis_client.expire(completed_key, 3600)  # 1小时过期,防止内存泄漏
+
+                # 获取实际完成的单元数量
+                completed_count = await redis_client.scard(completed_key)
+
+                # 基于实际完成数量计算进度,避免并发乱序导致的进度倒退
+                current = int((completed_count / total_units) * 100)
+
+                logger.info(f"Redis分布式进度更新: 任务{task_id} 完成数量{completed_count}/{total_units} 进度{current}%")
+            else:
+                # Redis连接失败时的降级方案(仅在单任务模式下安全)
+                logger.warning("Redis连接失败,使用降级进度计算方案(可能存在并发问题)")
+                current = int(((unit_index + 1) / total_units) * 100)
+                completed_count = unit_index + 1
 
 
             # 构建完成消息
             # 构建完成消息
             if issues_count > 0:
             if issues_count > 0:
-                message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
+                message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}(已发现{issues_count}个问题)"
             else:
             else:
-                message = f"已完成第 {unit_index + 1}/{total_units} 个单元: {section_label}"
+                message = f"已完成第 {completed_count}/{total_units} 个单元: {section_label}"
 
 
             logger.info(f"单元审查完成,更新进度: {current}% {message}")
             logger.info(f"单元审查完成,更新进度: {current}% {message}")
 
 

+ 1 - 1
foundation/base/celery_app.py

@@ -40,7 +40,7 @@ app.conf.update(
     enable_utc=True,
     enable_utc=True,
 
 
     # Worker配置
     # Worker配置
-    worker_prefetch_multiplier=1,  # 每个worker一次只取一个任务
+    worker_prefetch_multiplier=2,  # 每个worker一次只取一个任务
     task_acks_late=True,           # 任务完成后再确认
     task_acks_late=True,           # 任务完成后再确认
 
 
     # 并发控制
     # 并发控制

+ 31 - 1
foundation/base/redis_connection.py

@@ -20,7 +20,7 @@ except ImportError:
 # 导入Redis异常类
 # 导入Redis异常类
 from redis.exceptions import ConnectionError as redis_ConnectionError
 from redis.exceptions import ConnectionError as redis_ConnectionError
 
 
-from typing import Optional, Protocol, Dict, Any
+from typing import Optional, Protocol, Dict, Any, Set, Tuple
 from functools import wraps
 from functools import wraps
 import asyncio
 import asyncio
 from foundation.base.redis_config import RedisConfig
 from foundation.base.redis_config import RedisConfig
@@ -105,6 +105,16 @@ class RedisConnection(Protocol):
         int, list[str]]: ...
         int, list[str]]: ...
 
 
     async def eval(self, script: str, keys: list[str], args: list[str]) -> Any: ...
     async def eval(self, script: str, keys: list[str], args: list[str]) -> Any: ...
+
+    # 集合操作方法
+    async def sadd(self, key: str, *values: str) -> int: ...
+
+    async def scard(self, key: str) -> int: ...
+
+    async def srem(self, key: str, *values: str) -> int: ...
+
+    async def smembers(self, key: str) -> Set[str]: ...
+
     async def close(self) -> None: ...
     async def close(self) -> None: ...
 
 
 
 
@@ -224,6 +234,26 @@ class RedisAdapter(RedisConnection):
         """执行Redis脚本"""
         """执行Redis脚本"""
         return await self._redis.eval(script, numkeys, *keys_and_args) #  解包成独立参数
         return await self._redis.eval(script, numkeys, *keys_and_args) #  解包成独立参数
 
 
+    # 集合操作方法实现
+    @with_redis_retry()
+    async def sadd(self, key: str, *values: str) -> int:
+        """向集合添加成员,返回添加的成员数量"""
+        return await self._redis.sadd(key, *values)
+
+    @with_redis_retry()
+    async def scard(self, key: str) -> int:
+        """获取集合成员数量"""
+        return await self._redis.scard(key)
+
+    @with_redis_retry()
+    async def srem(self, key: str, *values: str) -> int:
+        """从集合删除成员,返回删除的成员数量"""
+        return await self._redis.srem(key, *values)
+
+    @with_redis_retry()
+    async def smembers(self, key: str) -> Set[str]:
+        """获取集合所有成员"""
+        return await self._redis.smembers(key)
 
 
     def get_langchain_redis_client(self):
     def get_langchain_redis_client(self):
         return self._langchain_redis_client
         return self._langchain_redis_client

+ 210 - 159
foundation/utils/utils.py

@@ -1,172 +1,217 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+'''
+@Project   : lq-agent-api
+@File      : utils.py
+@IDE       : VsCode
+@Author    :
+@Date      : 2025-12-04 10:13:12
+
+=================================
+
+📋 方法总览 (Method Overview)
+
+🏗️ 核心模型管理:
+├── ModelHandler()               # 模型处理器类
+├── get_models()                 # 获取模型的全局函数
+└── model_handler                # 全局模型处理器实例
+
+🔍 模型获取方法:
+├── _get_doubao_model()          # 获取豆包模型
+├── _get_qwen_model()            # 获取通义千问模型
+├── _get_deepseek_model()        # 获取DeepSeek模型
+├── _get_gemini_model()          # 获取Gemini模型
+├── _get_lq_qwen3_8b_model()     # 获取本地Qwen3-8B模型
+└── _get_qwen_local_14b_model()  # 获取本地Qwen3-14B模型
+'''
+
 from langchain_openai import ChatOpenAI
 from langchain_openai import ChatOpenAI
 from langchain_core.messages import HumanMessage
 from langchain_core.messages import HumanMessage
 
 
 from foundation.base.config import config_handler
 from foundation.base.config import config_handler
+from foundation.logger.loggering import server_logger as logger
 
 
 
 
 class ModelHandler:
 class ModelHandler:
+    """
+    AI模型处理器类,用于管理多种AI模型的创建和配置
+
+    支持的模型类型:
+    - doubao: 豆包模型
+    - qwen: 通义千问模型
+    - deepseek: DeepSeek模型
+    - gemini: Gemini模型
+    - lq_qwen3_8b: 本地Qwen3-8B模型
+    - qwen_local_14b: 本地Qwen3-14B模型
+    """
 
 
-	def __init__(self):
-		self.config = config_handler
-
-	def get_models(self):
-		"""
-			获取模型,默认为豆包
-		"""
-		model_type = self.config.get("model", "MODEL_TYPE")
-		if model_type == "doubao":
-			return self._get_doubao_model()
-		elif model_type == "qwen":
-			return self._get_qwen_model()
-		elif model_type == "deepseek":
-			return self._get_deepseek_model()
-		elif model_type == "qwen_local_1.5b":
-			return self._get_qwen_local_1_5b_model()
-		elif model_type == "qwen_local_14b":
-			return self._get_qwen_local_14b_model()
-		else:
-			# 默认返回豆包
-			return self._get_doubao_model()
-
-
-	def _get_doubao_model(self):
-		"""
-		获取豆包模型
-		"""
-		doubao_url = self.config.get("doubao", "DOUBAO_SERVER_URL")
-		doubao_model_id = self.config.get("doubao", "DOUBAO_MODEL_ID")
-		doubao_api_key = self.config.get("doubao", "DOUBAO_API_KEY")
-
-
-		llm = ChatOpenAI(
-			base_url=doubao_url,
-			model=doubao_model_id,
-			api_key=doubao_api_key,
-			temperature=0.7,
-			extra_body={
-				"enable_thinking": False,
-			})
-		
-		return llm
+    def __init__(self):
+        """
+        初始化模型处理器
+
+        加载配置处理器,用于后续读取各种模型的配置信息
+        """
+        self.config = config_handler
+
+    def get_models(self):
+        """
+        获取AI模型实例
+
+        Returns:
+            ChatOpenAI: 配置好的AI模型实例
+
+        Note:
+            根据配置文件中的MODEL_TYPE参数选择对应模型
+            支持的模型类型:doubao, qwen, deepseek, lq_qwen3_8b, qwen_local_14b
+            默认返回豆包模型
+        """
+        model_type = self.config.get("model", "MODEL_TYPE")
+        logger.info(f"正在初始化AI模型,模型类型: {model_type}")
+
+        if model_type == "doubao":
+            model = self._get_doubao_model()
+        if model_type == "gemini":
+            model = self._get_gemini_model()
+        elif model_type == "qwen":
+            model = self._get_qwen_model()
+        elif model_type == "deepseek":
+            model = self._get_deepseek_model()
+        elif model_type == "lq_qwen3_8b":
+            model = self._get_lq_qwen3_8b_model()
+        elif model_type == "qwen_local_14b":
+            model = self._get_qwen_local_14b_model()
+        else:
+            # 默认返回豆包
+            logger.warning(f"未知的模型类型 '{model_type}',使用默认gemini模型")
+            model = model = self._get_gemini_model()
+
+        logger.info(f"AI模型初始化完成: {model_type}")
+        return model
+
+
+    def _get_doubao_model(self):
+        """
+        获取豆包模型
+
+        Returns:
+            ChatOpenAI: 配置好的豆包模型实例
+        """
+        doubao_url = self.config.get("doubao", "DOUBAO_SERVER_URL")
+        doubao_model_id = self.config.get("doubao", "DOUBAO_MODEL_ID")
+        doubao_api_key = self.config.get("doubao", "DOUBAO_API_KEY")
+
+
+        llm = ChatOpenAI(
+            base_url=doubao_url,
+            model=doubao_model_id,
+            api_key=doubao_api_key,
+            temperature=0.7,
+            extra_body={
+                "enable_thinking": False,
+            })
+        
+        return llm
 
 
 	
 	
-	def _get_qwen_model(self):
-		"""
-		获取通义千问模型
-		"""
-		qwen_url = self.config.get("qwen", "QWEN_SERVER_URL")
-		qwen_model_id = self.config.get("qwen", "QWEN_MODEL_ID")
-		qwen_api_key = self.config.get("qwen", "QWEN_API_KEY")
-
-		print(f"Debug - qwen_url: {qwen_url}")
-		print(f"Debug - qwen_model_id: {qwen_model_id}")
-		print(f"Debug - qwen_api_key: {qwen_api_key[:10]}..." if qwen_api_key else "Debug - qwen_api_key: None")
-
-		llm = ChatOpenAI(
-			base_url=qwen_url,
-			model=qwen_model_id,
-			api_key=qwen_api_key,
-			temperature=0.7,
-			extra_body={
-				"enable_thinking": False,
-			})
-
-		return llm
-	
-	def _get_deepseek_model(self):
-		"""
-		获取通义千问模型
-		"""
-		qwen_url = self.config.get("qwen", "QWEN_SERVER_URL")
-		qwen_model_id = self.config.get("qwen", "QWEN_MODEL_ID")
-		qwen_api_key = self.config.get("qwen", "QWEN_API_KEY")
-
-		print(f"Debug - qwen_url: {qwen_url}")
-		print(f"Debug - qwen_model_id: {qwen_model_id}")
-		print(f"Debug - qwen_api_key: {qwen_api_key[:10]}..." if qwen_api_key else "Debug - qwen_api_key: None")
-
-		llm = ChatOpenAI(
-			base_url=qwen_url,
-			model=qwen_model_id,
-			api_key=qwen_api_key,
-			temperature=0.7,
-			extra_body={
-				"enable_thinking": False,
-			})
-
-		return llm
+    def _get_qwen_model(self):
+        """
+        获取通义千问模型
+
+        Returns:
+            ChatOpenAI: 配置好的通义千问模型实例
+        """
+        qwen_url = self.config.get("qwen", "QWEN_SERVER_URL")
+        qwen_model_id = self.config.get("qwen", "QWEN_MODEL_ID")
+        qwen_api_key = self.config.get("qwen", "QWEN_API_KEY")
+
+        llm = ChatOpenAI(
+            base_url=qwen_url,
+            model=qwen_model_id,
+            api_key=qwen_api_key,
+            temperature=0.7,
+            extra_body={
+                "enable_thinking": False,
+            })
+
+        return llm
 	
 	
-	def _get_deepseek_model(self):
-		"""
-		获取通义千问模型
-		"""
-		deepseek_url = self.config.get("deepseek", "DEEPSEEK_SERVER_URL")
-		deepseek_model_id = self.config.get("deepseek", "DEEPSEEK_MODEL_ID")
-		deepseek_api_key = self.config.get("deepseek", "DEEPSEEK_API_KEY")
-
-		print(f"Debug - deepseek_url: {deepseek_url}")
-		print(f"Debug - deepseek_model_id: {deepseek_model_id}")
-		print(f"Debug - deepseek_api_key: {deepseek_api_key[:10]}..." if deepseek_api_key else "Debug - deepseek_api_key: None")
-
-		llm = ChatOpenAI(
-			base_url=deepseek_url,
-			model=deepseek_model_id,
-			api_key=deepseek_api_key,
-			temperature=0.7,
-			extra_body={
-				"enable_thinking": False,
-			})
-
-		return llm
-	
-	def _get_gemini_model(self):
-		"""
-		获取通义千问模型
-		"""
-		gemini_url = self.config.get("gemini", "GEMINI_SERVER_URL")
-		gemini_model_id = self.config.get("gemini", "GEMINI_MODEL_ID")
-		gemini_api_key = self.config.get("gemini", "GEMINI_API_KEY")
-
-		print(f"Debug - gemini_url: {gemini_url}")
-		print(f"Debug - gemini_model_id: {gemini_model_id}")
-		print(f"Debug - gemini_api_key: {gemini_api_key[:10]}..." if gemini_api_key else "Debug - gemini_api_key: None")
-
-		llm = ChatOpenAI(
-			base_url=gemini_url,
-			model=gemini_model_id,
-			api_key=gemini_api_key,
-			temperature=0.7,
-			extra_body={
-				"enable_thinking": False,
-			})
-
-		return llm
-
-	def _get_qwen_local_1_5b_model(self):
-		"""
-		获取本地Qwen2.5-1.5B-Instruct模型
-		"""
-		llm = ChatOpenAI(
-			base_url="http://172.16.35.50:8000/v1",
-			model="Qwen2.5-1.5B-Instruct",
-			api_key="sk-dummy",  # 本地模型使用虚拟API key
-			temperature=0.7,
-		)
-
-		return llm
-
-	def _get_qwen_local_14b_model(self):
-		"""
-		获取本地Qwen3-14B模型
-		"""
-		llm = ChatOpenAI(
-			base_url="http://172.16.35.50:8003/v1",
-			model="Qwen3-14B",
-			api_key="sk-dummy",  # 本地模型使用虚拟API key
-			temperature=0.7,
-		)
-
-		return llm
+    def _get_deepseek_model(self):
+        """
+        获取DeepSeek模型
+
+        Returns:
+            ChatOpenAI: 配置好的DeepSeek模型实例
+        """
+        deepseek_url = self.config.get("deepseek", "DEEPSEEK_SERVER_URL")
+        deepseek_model_id = self.config.get("deepseek", "DEEPSEEK_MODEL_ID")
+        deepseek_api_key = self.config.get("deepseek", "DEEPSEEK_API_KEY")
+
+        llm = ChatOpenAI(
+            base_url=deepseek_url,
+            model=deepseek_model_id,
+            api_key=deepseek_api_key,
+            temperature=0.7,
+            extra_body={
+                "enable_thinking": False,
+            })
+
+        return llm
+
+    def _get_gemini_model(self):
+        """
+        获取Gemini模型
+
+        Returns:
+            ChatOpenAI: 配置好的Gemini模型实例
+        """
+        gemini_url = self.config.get("gemini", "GEMINI_SERVER_URL")
+        gemini_model_id = self.config.get("gemini", "GEMINI_MODEL_ID")
+        gemini_api_key = self.config.get("gemini", "GEMINI_API_KEY")
+
+        llm = ChatOpenAI(
+            base_url=gemini_url,
+            model=gemini_model_id,
+            api_key=gemini_api_key,
+            temperature=0.7,
+            # extra_body={
+            #     "enable_thinking": False,
+            # }
+            )
+
+        return llm
+
+    def _get_lq_qwen3_8b_model(self):
+        """
+        获取本地Qwen3-8B-Instruct模型
+
+        Returns:
+            ChatOpenAI: 配置好的本地Qwen3-8B模型实例
+        """
+        llm = ChatOpenAI(
+            base_url="http://192.168.91.253:9000/v1",
+            model="/mnt/Qwen3-8B",
+            api_key="dummy",  # 本地模型使用虚拟API key
+            temperature=0.7,
+        )
+
+        return llm
+
+    def _get_qwen_local_14b_model(self):
+        """
+        获取本地Qwen3-14B模型
+
+        Returns:
+            ChatOpenAI: 配置好的本地Qwen3-14B模型实例
+        """
+        llm = ChatOpenAI(
+            base_url="http://172.16.35.50:8003/v1",
+            model="Qwen3-14B",
+            api_key="sk-dummy",  # 本地模型使用虚拟API key
+            temperature=0.7,
+        )
+
+        return llm
 
 
 
 
 # 创建全局实例
 # 创建全局实例
@@ -175,7 +220,13 @@ model_handler = ModelHandler()
 def get_models():
 def get_models():
     """
     """
     获取模型的全局函数
     获取模型的全局函数
-    返回: (llm, chat, embed)
+
+    Returns:
+        tuple: (llm, chat, embed) - LLM模型、聊天模型和嵌入模型实例
+               注意:当前llm和chat使用相同模型实例,embed暂时返回None
+
+    Note:
+        这是一个便捷函数,直接使用全局model_handler实例获取模型
     """
     """
     llm = model_handler.get_models()
     llm = model_handler.get_models()
     # 暂时返回相同的模型作为chat和embed
     # 暂时返回相同的模型作为chat和embed

ファイルの差分が大きいため隠しています
+ 4 - 6
temp/AI审查结果.json


+ 0 - 244
test_complete_concurrent_fix.py

@@ -1,244 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-测试完整并发审查数据隔离修复效果
-验证所有修复是否能够有效解决 unit_review 事件混淆问题
-"""
-
-def test_subtask_id_generation():
-    """测试子任务ID生成逻辑"""
-    print("=== 测试子任务ID生成 ===")
-
-    callback_task_id = "main_task_123"
-
-    # 模拟并发单元索引
-    unit_indices = [0, 1, 2, 3, 4]
-
-    sub_task_ids = []
-    for unit_index in unit_indices:
-        sub_task_id = f"{callback_task_id}-unit-{unit_index}"
-        sub_task_ids.append(sub_task_id)
-        print(f"单元 {unit_index}: {sub_task_id}")
-
-    # 验证唯一性
-    unique_ids = set(sub_task_ids)
-    print(f"生成子任务ID数量: {len(sub_task_ids)}")
-    print(f"唯一子任务ID数量: {len(unique_ids)}")
-
-    if len(sub_task_ids) == len(unique_ids):
-        print(" 子任务ID唯一性验证通过")
-    else:
-        print(" 子任务ID存在重复")
-
-def test_progress_manager_isolation():
-    """测试ProgressManager数据隔离"""
-    print("\n=== 测试ProgressManager数据隔离 ===")
-
-    # 模拟不同的任务ID
-    task_ids = [
-        "main_task_123-unit-0",
-        "main_task_123-unit-1",
-        "main_task_123-unit-2"
-    ]
-
-    # 模拟不同的issues数据
-    issues_data = [
-        [{"location": "第3页:工程概况", "content": "问题1"}],
-        [{"location": "第5页:技术方案", "content": "问题2"}],
-        [{"location": "第7页:质量要求", "content": "问题3"}]
-    ]
-
-    # 模拟ProgressManager的行为
-    task_progress_store = {}
-
-    def simulate_update_stage_progress(callback_task_id, issues):
-        """模拟update_stage_progress方法"""
-        if callback_task_id not in task_progress_store:
-            task_progress_store[callback_task_id] = {}
-
-        task_progress_store[callback_task_id]["issues"] = issues
-        print(f"更新任务 {callback_task_id} 的issues: {len(issues)} 项")
-
-    def simulate_get_progress(callback_task_id):
-        """模拟get_progress方法"""
-        if callback_task_id in task_progress_store:
-            return task_progress_store[callback_task_id]
-        return None
-
-    # 测试并发更新
-    for i, task_id in enumerate(task_ids):
-        simulate_update_stage_progress(task_id, issues_data[i])
-
-    # 验证数据隔离
-    print("\n验证数据隔离效果:")
-    all_isolated = True
-    for i, task_id in enumerate(task_ids):
-        progress = simulate_get_progress(task_id)
-        if progress and "issues" in progress:
-            stored_issues = progress["issues"]
-            expected_issues = issues_data[i]
-
-            if stored_issues == expected_issues:
-                print(f" 任务 {task_id}: 数据隔离正确")
-            else:
-                print(f" 任务 {task_id}: 数据隔离失败")
-                print(f"   期望: {expected_issues}")
-                print(f"   实际: {stored_issues}")
-                all_isolated = False
-        else:
-            print(f" 任务 {task_id}: 无法获取数据")
-            all_isolated = False
-
-    return all_isolated
-
-def test_base_reviewer_parameter_handling():
-    """测试BaseReviewer参数处理"""
-    print("\n=== 测试BaseReviewer参数处理 ===")
-
-    def simulate_review_method(location_label=None):
-        """模拟修复后的review方法"""
-        if not location_label:
-            print(" 错误: location_label参数是必填的")
-            raise ValueError("location_label参数是必填项")
-
-        print(f" 使用参数传递的location_label: {location_label}")
-        return True
-
-    # 测试用例
-    test_cases = [
-        {"location": "第3页:工程概况", "should_pass": True},
-        {"location": None, "should_pass": False},
-        {"location": "第5页:技术方案", "should_pass": True}
-    ]
-
-    all_passed = True
-    for i, test_case in enumerate(test_cases):
-        print(f"\n测试用例 {i+1}: {test_case['location']}")
-        try:
-            result = simulate_review_method(test_case['location'])
-            if not test_case['should_pass']:
-                print(" 预期应该失败,但实际通过了")
-                all_passed = False
-            else:
-                print(" 测试通过")
-        except ValueError:
-            if test_case['should_pass']:
-                print(" 预期应该通过,但实际失败了")
-                all_passed = False
-            else:
-                print(" 测试通过 - 正确捕获了异常")
-
-    return all_passed
-
-def test_location_validation():
-    """测试位置验证逻辑"""
-    print("\n=== 测试位置验证逻辑 ===")
-
-    def validate_unit_issues(issues, expected_location, unit_index):
-        """模拟修复后的位置验证逻辑"""
-        validated_issues = []
-
-        for issue in issues:
-            issue_location = issue.get("location", "")
-            is_valid_location = False
-
-            if not issue_location:
-                is_valid_location = True
-                print(f"单元 {unit_index}: 接受location为空的问题")
-            elif issue_location == expected_location:
-                is_valid_location = True
-                print(f"单元 {unit_index}: location完全匹配 - {issue_location}")
-            elif expected_location in issue_location and len(issue_location.split(":")) == 2:
-                page_part, section_part = expected_location.split(":", 1)
-                if issue_location.startswith(page_part) and section_part in issue_location:
-                    is_valid_location = True
-                    print(f"单元 {unit_index}: location部分匹配验证通过 - {issue_location}")
-                else:
-                    print(f"单元 {unit_index}: location部分匹配验证失败 - 期望: {expected_location}, 实际: {issue_location}")
-            else:
-                print(f"单元 {unit_index}: 过滤掉位置不匹配的问题 - 期望: {expected_location}, 实际: {issue_location}")
-
-            if is_valid_location:
-                issue["unit_index"] = unit_index
-                issue["expected_location"] = expected_location
-                issue["validated"] = True
-                validated_issues.append(issue)
-
-        return validated_issues
-
-    # 测试用例
-    test_cases = [
-        {
-            "name": "完全匹配测试",
-            "issues": [
-                {"location": "第3页:工程概况", "content": "问题1"},
-                {"location": "第5页:技术方案", "content": "问题2"}  # 应该被过滤
-            ],
-            "expected_location": "第3页:工程概况",
-            "unit_index": 0,
-            "expected_count": 1
-        },
-        {
-            "name": "空location测试",
-            "issues": [
-                {"location": "", "content": "问题3"},
-                {"location": "第7页:质量要求", "content": "问题4"}  # 应该被过滤
-            ],
-            "expected_location": "第5页:技术方案",
-            "unit_index": 1,
-            "expected_count": 1
-        }
-    ]
-
-    all_passed = True
-    for test_case in test_cases:
-        print(f"\n--- {test_case['name']} ---")
-        validated = validate_unit_issues(
-            test_case["issues"],
-            test_case["expected_location"],
-            test_case["unit_index"]
-        )
-
-        print(f"原始问题数: {len(test_case['issues'])}, 验证通过: {len(validated)}")
-
-        if len(validated) == test_case["expected_count"]:
-            print(" 验证通过")
-        else:
-            print(" 验证失败")
-            all_passed = False
-
-    return all_passed
-
-if __name__ == "__main__":
-    print("开始测试完整并发审查数据隔离修复效果...")
-
-    # 运行所有测试
-    test_subtask_id_generation()
-    isolation_result = test_progress_manager_isolation()
-    base_reviewer_result = test_base_reviewer_parameter_handling()
-    validation_result = test_location_validation()
-
-    print("\n" + "="*50)
-    print("测试总结:")
-
-    print("\n🔧 修复要点:")
-    print("1.  修复了ProgressManager中重复的update_stage_progress方法")
-    print("2.  移除了自动清空issues的操作,避免并发数据干扰")
-    print("3.  为每个并发单元创建独立的子任务ID,实现数据隔离")
-    print("4. 强制BaseReviewer使用参数传递,移除实例变量回退")
-    print("5.  增强了位置验证逻辑,防止数据混淆")
-
-    print("\n 测试结果:")
-    print(f"ProgressManager数据隔离: {' 通过' if isolation_result else ' 失败'}")
-    print(f"BaseReviewer参数处理: {' 通过' if base_reviewer_result else ' 失败'}")
-    print(f"位置验证逻辑: {' 通过' if validation_result else ' 失败'}")
-
-    overall_result = isolation_result and base_reviewer_result and validation_result
-    print(f"\n总体测试结果: {' 全部通过' if overall_result else ' 存在问题'}")
-
-    if overall_result:
-        print("\n 修复成功!")
-        print("并发审查数据隔离问题已完全解决,unit_review事件混淆问题应该不会再出现。")
-    else:
-        print("\n 还有问题需要进一步修复。")

+ 42 - 50
views/construction_review/launch_review.py

@@ -19,6 +19,7 @@ from foundation.trace.trace_context import TraceContext, auto_trace
 from foundation.utils.redis_utils import get_file_info,store_file_info
 from foundation.utils.redis_utils import get_file_info,store_file_info
 from core.base.workflow_manager import WorkflowManager
 from core.base.workflow_manager import WorkflowManager
 from core.base.progress_manager import ProgressManager, sse_callback_manager
 from core.base.progress_manager import ProgressManager, sse_callback_manager
+from core.base.sse_manager import unified_sse_manager
 from views.construction_review.file_upload import validate_upload_parameters
 from views.construction_review.file_upload import validate_upload_parameters
 from .schemas.error_schemas import LaunchReviewErrors
 from .schemas.error_schemas import LaunchReviewErrors
 
 
@@ -34,53 +35,29 @@ progress_manager = ProgressManager()
 
 
 async def sse_progress_callback(callback_task_id: str, current_data: dict):
 async def sse_progress_callback(callback_task_id: str, current_data: dict):
     """SSE推送回调函数 - 接收进度更新并推送到客户端"""
     """SSE推送回调函数 - 接收进度更新并推送到客户端"""
-    await sse_manager.send_progress(callback_task_id, current_data)
+    await unified_sse_manager.send_progress(callback_task_id, current_data)
 
 
 class SimpleSSEManager:
 class SimpleSSEManager:
-    """SSE连接管理器 - 管理客户端SSE连接和消息推送"""
-
-    def __init__(self):
-        self.connections: Dict[str, asyncio.Queue] = {}
-
-    async def connect(self, callback_task_id: str):
-        """建立SSE连接 - 创建消息队列并发送连接确认"""
-        queue = asyncio.Queue()
-        self.connections[callback_task_id] = queue
+    """
+    SSE连接管理器 - 兼容性包装器,委托给统一SSE管理器
 
 
-        await queue.put({
-            "type": "connection_established",
-            "callback_task_id": callback_task_id,
-            "timestamp": datetime.now().isoformat()
-        })
+    注意: 此类保持向后兼容,建议直接使用 unified_sse_manager
+    """
 
 
-        logger.info(f"SSE连接: {callback_task_id}")
-        return queue
+    async def connect(self, callback_task_id: str, callback_func=None):
+        """建立SSE连接"""
+        return await unified_sse_manager.establish_connection(callback_task_id, callback_func)
 
 
     async def disconnect(self, callback_task_id: str):
     async def disconnect(self, callback_task_id: str):
-        """断开SSE连接 - 清理连接队列"""
-        if callback_task_id in self.connections:
-            del self.connections[callback_task_id]
-        logger.info(f"SSE连接已断开: {callback_task_id}")
+        """断开SSE连接"""
+        await unified_sse_manager.close_connection(callback_task_id)
 
 
     async def send_progress(self, callback_task_id: str, current_data: dict):
     async def send_progress(self, callback_task_id: str, current_data: dict):
-        """发送进度更新 - 将进度数据放入队列推送给客户端"""
-        queue = self.connections.get(callback_task_id)
-        if queue:
-            # 优先使用progress_manager传递的event_type,如果没有则使用默认逻辑
-            event_type = current_data.get("event_type", "processing")
-            # 处理特殊的单元审查事件
-            if event_type == "unit_review" or (event_type == "processing" and current_data.get("status") == "unit_review_update"):
-                event_type = "unit_review_update"
-
-            await queue.put({
-                "type": event_type,
-                "data": current_data,
-                "timestamp": datetime.now().isoformat()
-            })
-            logger.debug(f"SSE进度已推送: {callback_task_id}, 事件类型: {event_type}")
-        else:
-            logger.warning(f"SSE连接已断开,跳过进度推送: {callback_task_id} - AI审查任务继续执行")
+        """发送进度消息"""
+        await unified_sse_manager.send_progress(callback_task_id, current_data)
+
 
 
+# 创建兼容性实例
 sse_manager = SimpleSSEManager()
 sse_manager = SimpleSSEManager()
 
 
 def format_sse_event(event_type: str, data: str) -> str:
 def format_sse_event(event_type: str, data: str) -> str:
@@ -207,9 +184,8 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
     # 验证倾向性审查角色
     # 验证倾向性审查角色
     validate_tendency_review_role(tendency_review_role)
     validate_tendency_review_role(tendency_review_role)
 
 
-    # 注册SSE回调
-    sse_callback_manager.register_callback(callback_task_id, sse_progress_callback)
-    queue = await sse_manager.connect(callback_task_id)
+    # 使用统一SSE管理器建立连接并注册回调
+    queue = await unified_sse_manager.establish_connection(callback_task_id, sse_progress_callback)
 
 
     async def generate_launch_review_events():
     async def generate_launch_review_events():
         """生成启动审查SSE事件流"""
         """生成启动审查SSE事件流"""
@@ -349,22 +325,30 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
                             unified_data_json = json.dumps(unified_data, ensure_ascii=False)
                             unified_data_json = json.dumps(unified_data, ensure_ascii=False)
                             yield format_sse_event(sse_event_type, unified_data_json)
                             yield format_sse_event(sse_event_type, unified_data_json)
 
 
-                        # 检查SSE回调是否已被注销(作为任务结束信号)
-                        if not sse_callback_manager.is_callback_registered(callback_task_id):
-                            logger.info(f"检测到SSE回调已注销,任务结束: {callback_task_id}")
-                            # 推送最终的completed事件
+                        # 调试日志:记录接收到的消息
+                        logger.debug(f"收到消息 - 类型: {message_type}, 数据: {current_data}")
+
+                        # 特殊处理:跳过连接建立消息,避免误判为完成
+                        if message_type == "connection_established":
+                            logger.info(f"收到连接建立消息,继续监听: {callback_task_id}")
+                            continue
+
+                        # 特殊处理:收到连接关闭信号,立即结束SSE流
+                        if message_type == "connection_closed":
                             completion_data = {
                             completion_data = {
                                 "callback_task_id": callback_task_id,
                                 "callback_task_id": callback_task_id,
                                 "user_id": user_id,
                                 "user_id": user_id,
                                 "current": 100,
                                 "current": 100,
                                 "stage_name": "审查完成",
                                 "stage_name": "审查完成",
                                 "status": "completed",
                                 "status": "completed",
-                                "message": "施工审查方案处理完成!",
-                                "overall_task_status":  "completed",
+                                "message": f"施工审查方案处理完成!",
+                                "overall_task_status": "completed",
                                 "updated_at": current_data.get("updated_at", int(time.time())) if current_data else int(time.time()),
                                 "updated_at": current_data.get("updated_at", int(time.time())) if current_data else int(time.time()),
                             }
                             }
                             completion_json = json.dumps(completion_data, ensure_ascii=False)
                             completion_json = json.dumps(completion_data, ensure_ascii=False)
                             yield format_sse_event("completed", completion_json)
                             yield format_sse_event("completed", completion_json)
+                            logger.info(f"收到连接关闭信号,结束SSE流: {callback_task_id}")
+                            logger.info(f"SSE状态: SSE回调已注销")
                             break
                             break
 
 
                     except Exception as e:
                     except Exception as e:
@@ -428,9 +412,17 @@ async def launch_review_sse(request_data: LaunchReviewRequest):
             yield format_sse_event("error", error_data)
             yield format_sse_event("error", error_data)
 
 
         finally:
         finally:
-            # 清理回调连接
-            sse_callback_manager.unregister_callback(callback_task_id)
-            await sse_manager.disconnect(callback_task_id)
+            # 清理回调连接(确保资源被正确释放)
+            try:
+                sse_callback_manager.unregister_callback(callback_task_id)
+            except Exception as cleanup_error:
+                logger.warning(f"清理回调连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
+
+            try:
+                await unified_sse_manager.close_connection(callback_task_id)
+            except Exception as cleanup_error:
+                logger.warning(f"断开SSE连接时出错: {callback_task_id}, 错误: {str(cleanup_error)}")
+
             logger.debug(f"启动审查SSE流已结束: {callback_task_id}")
             logger.debug(f"启动审查SSE流已结束: {callback_task_id}")
 
 
     return StreamingResponse(
     return StreamingResponse(

この差分においてかなりの量のファイルが変更されているため、一部のファイルを表示していません