Bläddra i källkod

fix: 修复 Event loop is closed 与 Milvus field not exist 两个 Bug

- AsyncMySQLPool.initialize() 增加事件循环存活检测,解决单例池因
  loop.close() 导致下次复用时报 RuntimeError: Event loop is closed
- parent_tool.py 默认 output_fields 移除不存在的 file_name/title 字段,
  解决 MilvusException: field file_name not exist

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WangXuMing 1 vecka sedan
förälder
incheckning
d38efb2f0e

+ 8 - 4
core/construction_review/component/infrastructure/parent_tool.py

@@ -5,19 +5,20 @@
 @Project   : lq-agent-api
 @File      : parent_tool.py
 @IDE       : VsCode
-@Author    : 王旭明
+@Author    : wandaan
 @Date      : 2025-12-30
 @Description: 父文档增强工具 - 提供父文档查询和拼接功能
 """
 
 from typing import Any, Dict, List, Optional
 
+from foundation.infrastructure.config.config import config_handler
 from foundation.observability.logger.loggering import review_logger as logger
 
 
 # =============================== 配置 ===============================
 
-PARENT_COLLECTION = "rag_parent_hybrid"  # 父文档集合名称
+PARENT_COLLECTION = config_handler.get('rag_collections', 'PARENT_COLLECTION', 'rag_parent_hybrid')
 PARENT_OUTPUT_FIELDS = ["parent_id", "text"]  # 父文档查询字段
 
 
@@ -66,7 +67,7 @@ def fetch_parent_document(
 def fetch_parent_chunks_by_parent_id(
     milvus_manager,
     parent_id: str,
-    collection_name: str = "rag_parent_hybrid",  # 子块集合
+    collection_name: str = None,  # 从配置读取
     output_fields: List[str] = None
 ) -> Optional[List[Dict[str, Any]]]:
     """
@@ -82,7 +83,10 @@ def fetch_parent_chunks_by_parent_id(
         子块片段列表,按 pk 排序,如果不存在返回 None
     """
     if output_fields is None:
-        output_fields = ["pk", "text", "parent_id", "file_name", "title"]
+        output_fields = ["pk", "text", "parent_id"]
+
+    if collection_name is None:
+        collection_name = config_handler.get('rag_collections', 'PARENT_COLLECTION', 'rag_parent_hybrid')
 
     try:
         rows = milvus_manager.condition_query(

+ 7 - 1
foundation/infrastructure/mysql/async_mysql_conn_pool.py

@@ -23,7 +23,13 @@ class AsyncMySQLPool:
     async def initialize(self, max_retries=3, retry_delay=2):
         """初始化连接池,支持重试。已初始化且连接池健康时直接返回。"""
         if self._initialized and self._pool and not self._pool._closed:
-            return
+            if self._pool._loop and not self._pool._loop.is_closed():
+                return
+            server_logger.info("事件循环已关闭,重新初始化连接池...")
+            self._pool.close()
+            await self._pool.wait_closed()
+            self._pool = None
+            self._initialized = False
 
         # 关闭旧池(如果有)
         if self._pool and not self._pool._closed: