Explorar el Código

v0.0.4-功能优化
- 专业性审查使用qwen3_30B模型

WangXuMing hace 3 meses
padre
commit
3acbfeed1f

+ 10 - 3
core/base/workflow_manager.py

@@ -362,12 +362,19 @@ class WorkflowManager:
             return False
 
         except RuntimeError as e:
-            # 事件循环关闭是正常情况(任务结束),不记录错误
-            if "Event loop is closed" in str(e):
+            # 事件循环相关的错误处理
+            error_msg = str(e)
+            if "Event loop is closed" in error_msg:
+                # 事件循环关闭是正常情况(任务结束),不记录错误
                 logger.debug(f"检查终止信号时事件循环已关闭: {callback_task_id}")
                 return False
+            elif "bound to a different event loop" in error_msg:
+                # 事件循环不匹配,记录警告但不中断流程
+                logger.warning(f"检查终止信号时检测到事件循环不匹配: {callback_task_id},将忽略本次检查")
+                return False
             else:
-                logger.error(f"检查终止信号失败: {str(e)}", exc_info=True)
+                # 其他 RuntimeError 记录错误
+                logger.error(f"检查终止信号失败(RuntimeError): {error_msg}", exc_info=True)
                 return False
         except Exception as e:
             # 其他异常仍然记录错误

+ 2 - 2
core/construction_review/component/ai_review_engine.py

@@ -998,7 +998,7 @@ class AIReviewEngine(BaseReviewer):
             combined_content = review_content
 
         return await self.review("non_parameter_compliance_check", trace_id, reviewer_type, prompt_name, combined_content, review_references,
-                               reference_source, review_location_label, state, stage_name, timeout=45)
+                               reference_source, review_location_label, state, stage_name, timeout=45, model_name="qwen3_30b")
 
     async def check_parameter_compliance(self, trace_id_idx: str, review_content: str, review_references: str,
                                         reference_source: str, review_location_label: str, state: str, stage_name: str,
@@ -1032,7 +1032,7 @@ class AIReviewEngine(BaseReviewer):
             combined_content = review_content
 
         return await self.review("parameter_compliance_check", trace_id, reviewer_type, prompt_name, combined_content, review_references,
-                               reference_source, review_location_label, state, stage_name, timeout=45)
+                               reference_source, review_location_label, state, stage_name, timeout=45, model_name="qwen3_30b")
 
     async def outline_check(self, trace_id_idx: str, outline_content: Dict[str, Any],
                                    state:dict,stage_name:str) -> Dict[str, Any]:

+ 18 - 5
core/construction_review/component/reviewers/base_reviewer.py

@@ -37,7 +37,7 @@ class BaseReviewer(ABC):
     
     #@obverse
     async def  review(self, name: str, trace_id: str, reviewer_type: str, prompt_name: str, review_content: str, review_references: str = None,
-                    reference_source: str = None, review_location_label: str = None,state:str =None,stage_name:str = None, timeout: int = 60) -> ReviewResult:
+                    reference_source: str = None, review_location_label: str = None,state:str =None,stage_name:str = None, timeout: int = 60, model_name: str = None) -> ReviewResult:
         """
         执行审查
 
@@ -52,9 +52,16 @@ class BaseReviewer(ABC):
                 - ai: professional_suggestion, standardization_suggestion, completeness_suggestion, readability_suggestion
             review_content: 待审查内容 (必填)
             review_references: 审查参考内容 (可选)
-            stage_name: 阶段名称 (可选,用于进度更新)
+            reference_source: 参考来源 (可选)
+            review_location_label: 审查位置标签 (可选)
             state: 状态字典 (可选,用于进度更新)
+            stage_name: 阶段名称 (可选,用于进度更新)
             timeout: 模型调用超时时间,默认60秒 (可选)
+            model_name: 模型名称 (可选),支持动态切换模型
+                      支持的模型:doubao, qwen, deepseek, gemini,
+                                  lq_qwen3_8b, lq_qwen3_8b_lq_lora,
+                                  lq_qwen3_4b, qwen_local_14b
+                      如果为None,则使用配置文件中的默认模型
 
 
         Returns:
@@ -62,7 +69,12 @@ class BaseReviewer(ABC):
         """
         start_time = time.time()
         try:
-            logger.info(f"开始执行 {name} 审查,trace_id: {trace_id},内容长度: {len(review_content)}")
+            # 记录使用的模型
+            if model_name:
+                logger.info(f"开始执行 {name} 审查,trace_id: {trace_id}, 模型: {model_name}, 内容长度: {len(review_content)}")
+            else:
+                logger.info(f"开始执行 {name} 审查,trace_id: {trace_id}, 使用默认模型, 内容长度: {len(review_content)}")
+
             prompt_kwargs = {}
             prompt_kwargs["review_content"] = review_content
             prompt_kwargs["review_references"] = review_references or ""
@@ -77,11 +89,12 @@ class BaseReviewer(ABC):
                 "task_name": name
             }
 
-            # 调用模型
+            # 调用模型,传递model_name参数
             model_response = await self.model_client.get_model_generate_invoke(
                 trace_id=trace_id,
                 task_prompt_info=task_prompt_info,
-                timeout=timeout
+                timeout=timeout,
+                model_name=model_name
             )
             if reference_source:
                 result = self.format_result(model_response, name, reference_source, review_references)

+ 27 - 7
foundation/ai/agent/generate/model_generate.py

@@ -9,7 +9,7 @@
 '''
 
 from langchain_core.prompts import ChatPromptTemplate
-from foundation.ai.models.model_handler import get_models
+from foundation.ai.models.model_handler import model_handler
 from foundation.observability.logger.loggering import server_logger as logger
 import asyncio
 import time
@@ -21,16 +21,18 @@ class GenerateModelClient:
     """
 
     def __init__(self, default_timeout: int = 60, max_retries: int = 3, backoff_factor: float = 1.0):
-        # 获取部署的模型列表
-        llm, chat, embed = get_models()
-        self.llm = llm
-        self.chat = chat
+        # 获取默认模型
+        self.llm = model_handler.get_models()
+        self.chat = self.llm  # 当前chat和llm使用相同模型
 
         # 配置参数
         self.default_timeout = default_timeout
         self.max_retries = max_retries
         self.backoff_factor = backoff_factor
 
+        # 保存model_handler引用,用于动态获取模型
+        self.model_handler = model_handler
+
     async def _retry_with_backoff(self, func: Callable, *args, timeout: Optional[int] = None, **kwargs):
         """
         带指数退避的重试机制,每次重试都有独立的超时控制
@@ -61,14 +63,32 @@ class GenerateModelClient:
                 logger.warning(f"[模型调用] 第 {attempt + 1} 次尝试失败: {str(e)}, {wait_time}秒后重试...")
                 await asyncio.sleep(wait_time)
 
-    async def get_model_generate_invoke(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None):
+    async def get_model_generate_invoke(self, trace_id: str, task_prompt_info: dict, timeout: Optional[int] = None, model_name: Optional[str] = None):
         """
             模型非流式生成(异步)
+
+            Args:
+                trace_id: 追踪ID
+                task_prompt_info: 任务提示词信息
+                timeout: 超时时间(可选)
+                model_name: 模型名称(可选),支持动态切换模型
+                          支持的模型:doubao, qwen, deepseek, gemini,
+                                      lq_qwen3_8b, lq_qwen3_8b_lq_lora,
+                                      lq_qwen3_4b, qwen_local_14b
+                          如果为None,则使用默认模型
         """
         start_time = time.time()
         current_timeout = timeout or self.default_timeout
 
         try:
+            # 根据model_name选择对应的模型
+            if model_name:
+                llm_to_use = self.model_handler.get_model_by_name(model_name)
+                logger.info(f"[模型调用] 使用指定模型: {model_name}, trace_id: {trace_id}")
+            else:
+                llm_to_use = self.llm
+                logger.info(f"[模型调用] 使用默认模型, trace_id: {trace_id}")
+
             logger.info(f"[模型调用] 开始处理 trace_id: {trace_id}, 超时配置: {current_timeout}s")
 
             prompt_template = task_prompt_info["task_prompt"]
@@ -76,7 +96,7 @@ class GenerateModelClient:
 
             async def _invoke_model():
                 loop = asyncio.get_event_loop()
-                return await loop.run_in_executor(None, self.llm.invoke, messages)
+                return await loop.run_in_executor(None, llm_to_use.invoke, messages)
 
             # 调用带重试机制的方法,超时控制在重试机制内部处理
             response = await self._retry_with_backoff(_invoke_model, timeout=current_timeout)

+ 132 - 0
foundation/ai/models/model_handler.py

@@ -159,6 +159,8 @@ class ModelHandler:
                 model = self._get_gemini_model()
             elif model_type == "qwen":
                 model = self._get_qwen_model()
+            elif model_type == "qwen3_30b":
+                model = self._get_qwen3_30b_model()
             elif model_type == "deepseek":
                 model = self._get_deepseek_model()
             elif model_type == "lq_qwen3_8b":
@@ -199,6 +201,87 @@ class ModelHandler:
             # 如果所有模型都失败,抛出异常
             raise ModelConnectionError(f"无法初始化任何模型服务: {e}")
 
+    def get_model_by_name(self, model_type: str = None):
+        """
+        根据模型名称动态获取指定的AI模型实例
+
+        Args:
+            model_type: 模型类型名称,如果为None则使用配置文件中的默认模型
+                       支持的模型类型:doubao, qwen, qwen3_30b, deepseek, gemini,
+                                     lq_qwen3_8b, lq_qwen3_8b_lq_lora,
+                                     lq_qwen3_4b, qwen_local_14b
+
+        Returns:
+            ChatOpenAI: 配置好的AI模型实例
+
+        Note:
+            该方法支持动态切换模型,不受配置文件中的默认MODEL_TYPE限制
+            如果model_type为None,则使用配置文件中的默认模型
+            如果model_type无效,则使用gemini作为降级模型
+        """
+        # 如果未指定模型类型,使用配置文件中的默认模型
+        if model_type is None:
+            model_type = self.config.get("model", "MODEL_TYPE")
+
+        logger.info(f"动态获取AI模型,模型类型: {model_type}")
+
+        # 检查缓存
+        cache_key = f"chat_{model_type}"
+        if cache_key in self._model_cache:
+            logger.info(f"使用缓存的模型: {model_type}")
+            return self._model_cache[cache_key]
+
+        model = None
+
+        try:
+            if model_type == "doubao":
+                model = self._get_doubao_model()
+            elif model_type == "gemini":
+                model = self._get_gemini_model()
+            elif model_type == "qwen":
+                model = self._get_qwen_model()
+            elif model_type == "qwen3_30b":
+                model = self._get_qwen3_30b_model()
+            elif model_type == "deepseek":
+                model = self._get_deepseek_model()
+            elif model_type == "lq_qwen3_8b":
+                model = self._get_lq_qwen3_8b_model()
+            elif model_type == "lq_qwen3_8b_lq_lora":
+                model = self._get_lq_qwen3_8b_lora_model()
+            elif model_type == "lq_qwen3_4b":
+                model = self._get_lq_qwen3_4b_model()
+            elif model_type == "qwen_local_14b":
+                model = self._get_qwen_local_14b_model()
+            else:
+                # 默认返回gemini
+                logger.warning(f"未知的模型类型 '{model_type}',使用默认gemini模型")
+                model = self._get_gemini_model()
+
+            if model:
+                self._model_cache[cache_key] = model
+                logger.info(f"AI模型动态初始化完成: {model_type}")
+                return model
+            else:
+                raise ModelAPIError(f"模型初始化返回None: {model_type}")
+
+        except Exception as e:
+            logger.error(f"动态获取模型失败 [{model_type}]: {e}")
+
+            # 尝试使用gemini作为降级方案
+            if model_type != "gemini":
+                logger.info("尝试使用Gemini模型作为降级方案")
+                try:
+                    fallback_model = self._get_gemini_model()
+                    if fallback_model:
+                        self._model_cache[cache_key] = fallback_model
+                        logger.warning(f"已切换到Gemini降级模型")
+                        return fallback_model
+                except Exception as fallback_error:
+                    logger.error(f"降级模型也失败: {fallback_error}")
+
+            # 如果所有模型都失败,抛出异常
+            raise ModelConnectionError(f"无法初始化任何模型服务: {e}")
+
     def get_embedding_model(self):
         """
         获取Embedding模型实例
@@ -341,6 +424,55 @@ class ModelHandler:
             error = ModelAPIError(f"通义千问模型初始化异常: {e}")
             return self._handle_model_error("qwen", error)
 
+    def _get_qwen3_30b_model(self):
+        """
+        获取Qwen3-30B模型
+
+        Returns:
+            ChatOpenAI: 配置好的Qwen3-30B模型实例
+        """
+        try:
+            qwen3_30b_url = self.config.get("qwen3_30b", "QWEN3_30B_SERVER_URL")
+            qwen3_30b_model_id = self.config.get("qwen3_30b", "QWEN3_30B_MODEL_ID")
+            qwen3_30b_api_key = self.config.get("qwen3_30b", "QWEN3_30B_API_KEY")
+
+            # 验证配置完整性
+            if not all([qwen3_30b_url, qwen3_30b_model_id, qwen3_30b_api_key]):
+                missing = []
+                if not qwen3_30b_url:
+                    missing.append("QWEN3_30B_SERVER_URL")
+                if not qwen3_30b_model_id:
+                    missing.append("QWEN3_30B_MODEL_ID")
+                if not qwen3_30b_api_key:
+                    missing.append("QWEN3_30B_API_KEY")
+                raise ModelConfigError(f"Qwen3-30B模型配置不完整,缺少: {', '.join(missing)}")
+
+            # 检查连接
+            if not self._check_connection(qwen3_30b_url, qwen3_30b_api_key):
+                logger.warning(f"Qwen3-30B模型服务连接失败: {qwen3_30b_url}")
+                raise ModelConnectionError(f"无法连接到Qwen3-30B模型服务: {qwen3_30b_url}")
+
+            llm = ChatOpenAI(
+                base_url=qwen3_30b_url,
+                model=qwen3_30b_model_id,
+                api_key=qwen3_30b_api_key,
+                temperature=0.7,
+                timeout=self.REQUEST_TIMEOUT,
+                extra_body={
+                    "enable_thinking": False,
+                })
+
+            logger.info(f"Qwen3-30B模型初始化成功: {qwen3_30b_model_id}")
+            return llm
+
+        except ModelConfigError:
+            raise
+        except ModelConnectionError:
+            raise
+        except Exception as e:
+            error = ModelAPIError(f"Qwen3-30B模型初始化异常: {e}")
+            return self._handle_model_error("qwen3_30b", error)
+
     def _get_deepseek_model(self):
         """
         获取DeepSeek模型

+ 37 - 2
foundation/infrastructure/cache/redis_connection.py

@@ -303,20 +303,50 @@ class RedisConnectionFactory:
     """
     _connections: Dict[str, RedisConnection] = {}
     _stores: Dict[str, RedisStore] = {}
+    _connection_loops: Dict[str, asyncio.AbstractEventLoop] = {}  # 记录每个连接的事件循环
 
     @classmethod
     async def get_connection(cls) -> RedisConnection:
-        """获取Redis连接(单例模式)"""
+        """获取Redis连接(单例模式,支持事件循环检测)"""
         # 加载配置
         redis_config = load_config_from_env()
         #_get_redis_logger().info(f"redis_config={redis_config}")
         # 使用配置参数生成唯一标识
         conn_id = f"{redis_config.url}-{redis_config.db}"
 
+        # 获取当前事件循环
+        try:
+            current_loop = asyncio.get_running_loop()
+        except RuntimeError:
+            # 如果没有运行的事件循环,创建一个新的
+            current_loop = asyncio.new_event_loop()
+            asyncio.set_event_loop(current_loop)
+
+        # 检查连接是否存在以及事件循环是否匹配
+        if conn_id in cls._connections:
+            stored_loop = cls._connection_loops.get(conn_id)
+            if stored_loop != current_loop:
+                # 事件循环不匹配,需要重新创建连接
+                _get_redis_logger().warning(
+                    f"检测到事件循环变化,重新创建Redis连接: {conn_id}"
+                )
+                # 关闭旧连接
+                try:
+                    await cls._connections[conn_id].close()
+                except Exception as e:
+                    _get_redis_logger().debug(f"关闭旧Redis连接时出错: {e}")
+                # 删除旧连接
+                del cls._connections[conn_id]
+                del cls._connection_loops[conn_id]
+
+        # 创建新连接
         if conn_id not in cls._connections:
             adapter = RedisAdapter(redis_config)
             await adapter.connect()
             cls._connections[conn_id] = adapter
+            cls._connection_loops[conn_id] = current_loop
+            _get_redis_logger().info(f"创建新的Redis连接: {conn_id}")
+
         return cls._connections[conn_id]
 
     @classmethod
@@ -349,8 +379,13 @@ class RedisConnectionFactory:
     async def close_all(cls):
         """关闭所有Redis连接"""
         for conn in cls._connections.values():
-            await conn.close()
+            try:
+                await conn.close()
+            except Exception as e:
+                _get_redis_logger().debug(f"关闭Redis连接时出错: {e}")
         cls._connections = {}
+        cls._connection_loops = {}  # 同时清理事件循环记录
+        cls._stores = {}
 
     @classmethod
     def get_connection_count(cls) -> int: