Bläddra i källkod

fix: 修复 Milvus field not exist 与日志追踪问题

修复 Milvus 向量检索中字段缺失导致的查询失败,增强日志记录与链路追踪上下文传递。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
WangXuMing 1 vecka sedan
förälder
incheckning
ede6e9a389

+ 44 - 45
foundation/database/base/vector/milvus_vector.py

@@ -23,7 +23,7 @@ def _get_logger():
     global logger
     if logger is None:
         try:
-            from foundation.observability.logger.loggering import server_logger
+            from foundation.observability.logger.loggering import review_logger as server_logger
             logger = server_logger
         except ImportError:
             # 如果导入失败,创建一个简单的logger替代品
@@ -67,7 +67,7 @@ class MilvusVectorManager(BaseVectorDB):
         self.connection_args = {
             "uri": f"http://{self.host}:{self.port}",
             "user": self.user,
-            "db_name": "lq_db"
+            "db_name": self.milvus_db
         }
         if self.password:
             self.connection_args["password"] = self.password
@@ -77,13 +77,13 @@ class MilvusVectorManager(BaseVectorDB):
 
         # 预创建常用的vectorstore连接,避免运行时竞争
         self._vectorstore_cache = {}
+        self._collection_cache = {}  # 缓存 pymilvus Collection 对象
         self._create_common_connections()
 
     def _create_common_connections(self):
         """预创建常用的vectorstore连接"""
         common_collections = [
-            "first_bfp_collection_entity",
-            "first_bfp_collection_test"
+            config_handler.get('rag_collections', 'ENTITY_COLLECTION', 'first_bfp_collection_entity')
         ]
 
         # 抑制 AsyncMilvusClient 的警告日志
@@ -109,6 +109,12 @@ class MilvusVectorManager(BaseVectorDB):
         finally:
             logging.getLogger('pymilvus').setLevel(original_level)
 
+    def _get_collection(self, collection_name: str) -> Collection:
+        """获取缓存的 Collection 对象,避免重复创建"""
+        if collection_name not in self._collection_cache:
+            self._collection_cache[collection_name] = Collection(collection_name)
+        return self._collection_cache[collection_name]
+
     def text_to_vector(self, text: str) -> List[float]:
         """
         将文本转换为向量(重写基类方法,直接使用嵌入模型)
@@ -122,34 +128,42 @@ class MilvusVectorManager(BaseVectorDB):
             raise
     
     def connect(self):
-        """连接到 Milvus 服务器
-        ,
-                password=self.password
-                alias="default",
-        """
+        """连接到 Milvus 服务器"""
         try:
             connections.connect(
                 alias="default",
                 host=self.host,
                 port=self.port,
                 user=self.user,
-                db_name="lq_db"
+                password=self.password,
+                db_name=self.milvus_db
             )
             _get_logger().info(f"Connected to Milvus at {self.host}:{self.port}")
         except Exception as e:
             _get_logger().error(f"Failed to connect to Milvus: {e}")
             raise
     
-    def create_collection(self, collection_name: str, dimension: int = 768, 
-                         description: str = "Vector collection for text embeddings"):
+    def create_collection(self, collection_name: str, dimension: int = 768,
+                         description: str = "Vector collection for text embeddings",
+                         allow_drop: bool = False):
         """
         创建向量集合
+
+        Args:
+            collection_name: 集合名称
+            dimension: 向量维度
+            description: 集合描述
+            allow_drop: 是否允许删除已有集合。默认 False,集合已存在时直接返回。
         """
         try:
             # 检查集合是否已存在
             if utility.has_collection(collection_name):
-                _get_logger().info(f"Collection {collection_name} already exists")
+                if not allow_drop:
+                    _get_logger().info(f"Collection {collection_name} already exists, skip creation (allow_drop=False)")
+                    return
+                _get_logger().warning(f"Collection {collection_name} already exists, dropping (allow_drop=True)")
                 utility.drop_collection(collection_name)
+                self._collection_cache.pop(collection_name, None)
                 _get_logger().info(f"Collection '{collection_name}' dropped successfully")
                 
             
@@ -199,7 +213,7 @@ class MilvusVectorManager(BaseVectorDB):
             collection_name = param.get('collection_name')
             text = document.get('content')
             metadata = document.get('metadata')
-            collection = Collection(collection_name)
+            collection = self._get_collection(collection_name)
             created_at = None
             
             # 转换文本为向量
@@ -237,8 +251,8 @@ class MilvusVectorManager(BaseVectorDB):
         """
         try:
             collection_name = param.get('collection_name')
-            collection = Collection(collection_name)
-            
+            collection = self._get_collection(collection_name)
+
             text_contents = []
             embeddings = []
             metadatas = []
@@ -282,8 +296,8 @@ class MilvusVectorManager(BaseVectorDB):
         """
         try:
             collection_name = param.get('collection_name')
-            collection = Collection(collection_name)
-            
+            collection = self._get_collection(collection_name)
+
             # 加载集合到内存(如果还没有加载)
             collection.load()
             
@@ -305,7 +319,7 @@ class MilvusVectorManager(BaseVectorDB):
                 param=search_params,
                 limit=top_k,
                 expr=filter_expr,
-                output_fields=["text", "metadata"]
+                output_fields=["text_content", "metadata"]
             )
             
             # 格式化结果
@@ -314,8 +328,8 @@ class MilvusVectorManager(BaseVectorDB):
                 for hit in hits:
                     formatted_results.append({
                         'id': hit.id,
-                        'text_content': hit.entity.get('text'),
-                        'text': hit.entity.get('text'),  # 添加 text 字段以兼容现有代码
+                        'text_content': hit.entity.get('text_content'),
+                        'text': hit.entity.get('text_content'),
                         'metadata': hit.entity.get('metadata'),
                         'distance': hit.distance,
                         'similarity': 1 - hit.distance  # 转换为相似度
@@ -334,7 +348,7 @@ class MilvusVectorManager(BaseVectorDB):
         """
         try:
             collection_name = param.get('collection_name')
-            collection = Collection(collection_name)
+            collection = self._get_collection(collection_name)
             collection.load()
             
             query_embedding = self.text_to_vector(query_text)
@@ -353,7 +367,7 @@ class MilvusVectorManager(BaseVectorDB):
                 param=search_params,
                 limit=top_k,
                 expr=filter_expr,
-                output_fields=["text", "metadata"]
+                output_fields=["text_content", "metadata"]
             )
             
             formatted_results = []
@@ -393,7 +407,8 @@ class MilvusVectorManager(BaseVectorDB):
         
         return filter_expr
 
-    def create_hybrid_collection(self, collection_name: str, documents: List[Dict[str, Any]]):
+    def create_hybrid_collection(self, collection_name: str, documents: List[Dict[str, Any]],
+                                drop_old: bool = False):
         """
         创建支持混合搜索的集合
 
@@ -406,7 +421,7 @@ class MilvusVectorManager(BaseVectorDB):
             connection_args = {
                 "uri": f"http://{self.host}:{self.port}",
                 "user": self.user,
-                "db_name": "lq_db"
+                "db_name": self.milvus_db
             }
 
             if self.password:
@@ -417,11 +432,11 @@ class MilvusVectorManager(BaseVectorDB):
             for doc in documents:
                 content = doc.get('content', '')
                 metadata = doc.get('metadata', {})
-                processed_metadata = self._process_metadata(doc)
+                processed_metadata = self._process_metadata(metadata)
                 langchain_doc = Document(page_content=content, metadata=processed_metadata)
                 langchain_docs.append(langchain_doc)
 
-            # 创建混合搜索向量存储 (完全按照 test_hybrid_v2.6.py 的逻辑)
+            # 创建混合搜索向量存储
             vectorstore = Milvus.from_documents(
                 documents=langchain_docs,
                 embedding=self.emdmodel,
@@ -430,7 +445,7 @@ class MilvusVectorManager(BaseVectorDB):
                 connection_args=connection_args,
                 collection_name=collection_name,
                 consistency_level="Strong",
-                drop_old=True,
+                drop_old=drop_old,
             )
 
             _get_logger().info(f"Created hybrid collection: {collection_name} with {len(documents)} documents")
@@ -461,7 +476,7 @@ class MilvusVectorManager(BaseVectorDB):
         """
         try:
             collection_name = param.get('collection_name')
-            logger.info(f"开始 hybrid_search, collection_name: {collection_name}")
+            _get_logger().info(f"开始 hybrid_search, collection_name: {collection_name}")
             # 使用预创建的连接,避免运行时竞争
             if collection_name in self._vectorstore_cache:
                 vectorstore = self._vectorstore_cache[collection_name]
@@ -519,22 +534,6 @@ class MilvusVectorManager(BaseVectorDB):
                     'similarity': float(similarity)  # 转换为相似度
                 })
 
-            #     # 记录每个结果的评分信息
-            #     metadata = doc.metadata.get('metadata', {})
-            #     title = 'N/A'
-            #     if isinstance(metadata, str):
-            #         try:
-            #             import json
-            #             inner_metadata = json.loads(metadata)
-            #             title = inner_metadata.get('title', 'N/A')
-            #         except:
-            #             pass
-            #     else:
-            #         title = metadata.get('title', 'N/A')
-
-            #     _get_logger().info(f"混合搜索评分: 标题='{title}', 距离={score:.4f}, 相似度={similarity:.4f}")
-
-            # _get_logger().info(f"Hybrid search returned {len(formatted_results)} results")
             return formatted_results
 
         except Exception as e:

+ 3 - 1
foundation/infrastructure/tracing/trace_context.py

@@ -144,8 +144,10 @@ class TraceFilter(logging.Filter):
     """
 
     def filter(self, record: logging.LogRecord) -> bool:
-        """为日志记录添加system_trace_id字段"""
+        """为日志记录添加system_trace_id和log_type字段"""
         record.system_trace_id = TraceContext.get_trace_id()
+        if not hasattr(record, 'log_type'):
+            record.log_type = 'system'
         return True
 
 

+ 12 - 0
foundation/observability/logger/loggering.py

@@ -360,6 +360,18 @@ write_logger = ModuleLogger(
 )
 write_logger.info(f"construction_write logger initialized, log_dir: {os.path.join(base_log_dir, 'construction_write')}")
 
+# 脱敏模块专用logger(按周组织日志目录,日志写入 review/desensitize/ 子目录)
+desensitize_logger = ModuleLogger(
+    name="desensitize",
+    module_name="desensitize",
+    log_dir=os.path.join(base_log_dir, "construction_review"),
+    console_output=True,
+    file_max_mb=file_max,
+    backup_count=backup,
+    weekly=True
+)
+desensitize_logger.info(f"desensitize logger initialized, log_dir: {os.path.join(base_log_dir, 'construction_review', 'desensitize')}")
+
 
 def configure_logging_for_subprocess():
     """