Bladeren bron

v0.0.3-调试-debug
- 修复多路召回数据对应

WangXuMing 2 maanden geleden
bovenliggende
commit
5801a844d3

+ 2 - 0
config/config.ini

@@ -112,9 +112,11 @@ PGVECTOR_PASSWORD=pg16@123
 MILVUS_HOST=192.168.92.61
 MILVUS_PORT=19530
 MILVUS_DB=lq_db
+MILVUS_COLLECTION=first_bfp_collection
 MILVUS_USER=
 MILVUS_PASSWORD=
 
+
 [hybrid_search]
 # 混合检索权重配置
 DENSE_WEIGHT=0.7

+ 7 - 117
core/construction_review/component/ai_review_engine.py

@@ -46,10 +46,13 @@
 """
 
 import time
+import json
 import asyncio
 from enum import Enum
 from dataclasses import dataclass
 from typing import Dict, List, Any
+from foundation.ai.rag.retrieval.retrieval import retrieval_manager
+from foundation.infrastructure.config.config import config_handler
 from foundation.observability.logger.loggering import server_logger as logger
 from core.construction_review.component.reviewers.base_reviewer import BaseReviewer,BaseRAGReviewer
 @dataclass
@@ -79,10 +82,7 @@ class Stage(Enum):
         'design': 'design_values_check'
     }
     RAG = {
-        'reviewer_type':'rag',
         'rag': 'rag_enhanced_review',
-        'vector': 'vector_search_review',
-        'hybrid': 'hybrid_search_review'
     }
     AI = {
         'reviewer_type':'ai',
@@ -100,6 +100,7 @@ class AIReviewEngine(BaseReviewer):
         super().__init__()
         self.max_concurrent_reviews = max_concurrent_reviews
         self.semaphore = asyncio.Semaphore(max_concurrent_reviews)
+        self.milvus_collection = config_handler.get('milvus', 'MILVUS_COLLECTION', 'default')
 
     def set_review_location_label(self, location_label: str):
         """
@@ -247,19 +248,11 @@ class AIReviewEngine(BaseReviewer):
             Dict[str, Any]: RAG增强审查结果
         """
         # 向量检索
-        vector_results = await self.vector_search(unit_content['content'])
-
-        # 混合检索
-        hybrid_results = await self.hybrid_search(unit_content['content'])
-
-        # 重排序
-        reranked_results = await self.rerank_results(unit_content['content'], hybrid_results)
-
+        vector_results = retrieval_manager.multi_stage_recall(self.milvus_collection, unit_content['content'])
+        vector_results[0]
         return {
             'vector_search': vector_results,
-            'hybrid_search': hybrid_results,
-            'reranked_results': reranked_results,
-            'enhanced_suggestions': self.generate_enhanced_suggestions(reranked_results)
+
         }
 
 
@@ -396,60 +389,6 @@ class AIReviewEngine(BaseReviewer):
                                stage_name, state, current_progress)
 
     # RAG检索增强
-    async def vector_search(self, content: str) -> List[Dict[str, Any]]:
-        """
-        向量检索
-
-        Args:
-            content: 检索内容
-
-        Returns:
-            List[Dict[str, Any]]: 向量检索结果列表
-        """
-        await asyncio.sleep(0.1)
-        return [{"similarity": 0.85, "content": "相关标准1"}, {"similarity": 0.78, "content": "相关标准2"}]
-
-    async def hybrid_search(self, content: str) -> List[Dict[str, Any]]:
-        """
-        混合检索
-
-        Args:
-            content: 检索内容
-
-        Returns:
-            List[Dict[str, Any]]: 混合检索结果列表
-        """
-        await asyncio.sleep(0.2)
-        return [{"score": 0.88, "content": "混合检索结果1"}, {"score": 0.82, "content": "混合检索结果2"}]
-
-    async def rerank_results(self, content: str, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
-        """
-        重排序
-
-        Args:
-            content: 原始内容
-            results: 待重排序的结果列表
-
-        Returns:
-            List[Dict[str, Any]]: 重排序后的结果列表
-        """
-        await asyncio.sleep(0.1)
-        return sorted(results, key=lambda x: x.get('score', 0), reverse=True)[:5]
-
-    def generate_enhanced_suggestions(self, results: List[Dict[str, Any]]) -> List[str]:
-        """
-        生成增强建议
-
-        Args:
-            results: 检索结果列表
-
-        Returns:
-            List[str]: 增强建议列表
-        """
-        suggestions = []
-        for result in results:
-            suggestions.append(f"基于{result.get('content', '相关内容')}的建议")
-        return suggestions
 
     def _calculate_basic_score(self, grammar: Dict, semantic: Dict, completeness: Dict) -> float:
         """
@@ -479,54 +418,5 @@ class AIReviewEngine(BaseReviewer):
         """
         return (mandatory.get('compliance_rate', 0) + design.get('accuracy', 0) + technical.get('precision', 0)) / 3
 
-    def _calculate_overall_risk(self, basic: Dict, technical: Dict, rag: Dict) -> str:
-        """
-        计算总体风险等级
-
-        Args:
-            basic: 基础合规性结果
-            technical: 技术合规性结果
-            rag: RAG增强审查结果
-
-        Returns:
-            str: 风险等级 ("low", "medium", "high")
-        """
-        basic_score = basic.get('overall_score', 0)
-        technical_score = technical.get('overall_score', 0)
-
-        # 如果RAG被禁用或有错误,忽略它
-        if 'error' in rag:
-            avg_score = (basic_score + technical_score) / 2
-        else:
-            rag_score = rag.get('overall_score', 0)
-            avg_score = (basic_score + technical_score + rag_score) / 3
-
-        if avg_score >= 90:
-            return "low"
-        elif avg_score >= 70:
-            return "medium"
-        else:
-            return "high"
-
-    def _aggregate_results(self, results: List[ReviewResult]) -> Dict[str, Any]:
-        """
-        汇总审查结果
-
-        Args:
-            results: 审查结果列表
-
-        Returns:
-            Dict[str, Any]: 汇总后的统计结果
-        """
-        risk_counts = {"high": 0, "medium": 0, "low": 0}
 
-        for result in results:
-            risk_counts[result.overall_risk] += 1
 
-        return {
-            "risk_distribution": risk_counts,
-            "total_issues": len([r for r in results if r.overall_risk != "low"]),
-            "high_risk_count": risk_counts["high"],
-            "medium_risk_count": risk_counts["medium"],
-            "low_risk_count": risk_counts["low"]
-        }

+ 0 - 0
database/base/__init__.py


+ 0 - 0
database/migrations/__init__.py


+ 0 - 0
database/models/__init__.py


+ 0 - 0
database/repositories/__init__.py


+ 0 - 36
database/repositories/bus_data_query.py

@@ -1,36 +0,0 @@
-from typing import List, Tuple, Any, Optional, Dict
-from foundation.observability.logger.loggering import server_logger
-from foundation.utils.common import handler_err
-from foundation.database.base.sql.async_mysql_base_dao import AsyncBaseDAO
-
-
-class BasisOfPreparationDAO(AsyncBaseDAO):
-    """异步编制依据 对象"""
-    
-    
-    async def get_info_by_id(self, id: int) -> Optional[Dict]:
-        """根据ID获取编制依据"""
-        query = "SELECT * FROM t_basis_of_preparation WHERE id = %s"
-        return await self.fetch_one(query, (id,))
-    
-    async def get_list(self) -> List[Dict]:
-        """获取所有编制依据"""
-        query = "SELECT * FROM t_basis_of_preparation WHERE status = 'current' ORDER BY created_at DESC"
-        return await self.fetch_all(query)
-    
-
-    async def get_info_by_condition(self, conditions: Dict) -> List[Dict]:
-        """根据条件查询编制依据"""
-        if not conditions:
-            return await self.get_list()
-        
-        try:
-            where_clause = " AND ".join([f"{field} = %s" for field in conditions.keys()])
-            where_values = list(conditions.values())
-            
-            query = f"SELECT * FROM t_basis_of_preparation WHERE {where_clause} AND status = 'current' ORDER BY created_at DESC"
-            return await self.fetch_all(query, tuple(where_values))
-            
-        except Exception as err:
-            handler_err(logger=server_logger, err=err, err_name="条件查询失败")
-            raise

+ 107 - 0
debug_rag_mapping.py

@@ -0,0 +1,107 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+
+"""
+调试RAG检索元数据映射问题
+"""
+
+import sys
+import os
+import json
+sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
+
+from foundation.ai.rag.retrieval.retrieval import retrieval_manager
+from foundation.ai.models.rerank_model import rerank_model
+from foundation.observability.logger.loggering import server_logger as logger
+
+
+def debug_multi_stage_recall(collection_name, query):
+    """
+    调试多阶段召回的详细过程
+    """
+    try:
+        print("=" * 80)
+        print("调试多阶段召回元数据映射问题")
+        print("=" * 80)
+
+        # 第一阶段:获取混合搜索结果
+        print("\n第一阶段:混合搜索召回")
+        hybrid_results = retrieval_manager.hybrid_search_recall(
+            collection_name=collection_name,
+            query_text=query,
+            top_k=5,
+            ranker_type="weighted"
+        )
+
+        print(f"混合搜索返回 {len(hybrid_results)} 个结果:")
+        for i, result in enumerate(hybrid_results):
+            print(f"\n--- 混合搜索结果 {i} ---")
+            print(f"文本内容(前100字符): {result['text_content'][:100]}...")
+            print(f"元数据: {result.get('metadata', {})}")
+
+        # 提取候选文档文本
+        candidates = [result['text_content'] for result in hybrid_results]
+        print(f"\n提取的候选文档数量: {len(candidates)}")
+
+        # 第二阶段:获取重排序结果
+        print("\n第二阶段:重排序召回")
+        rerank_api_results = rerank_model.bge_rerank(query, candidates, top_k=3)
+
+        print(f"\nBGE重排序API原始返回 {len(rerank_api_results)} 个结果:")
+        for i, result in enumerate(rerank_api_results):
+            print(f"\n--- BGE API结果 {i} ---")
+            print(f"文本内容(前100字符): {result['text'][:100]}...")
+            print(f"重排序分数: {result.get('score', 'N/A')}")
+            print(f"API返回的索引: {result.get('index', 'N/A')}")
+
+        # 第三阶段:元数据映射
+        print("\n第三阶段:元数据映射")
+        rerank_results = retrieval_manager.rerank_recall(
+            candidates=candidates,
+            query_text=query,
+            top_k=3
+        )
+
+        print(f"\n重排序处理后的结果:")
+        for i, result in enumerate(rerank_results):
+            print(f"\n--- 重排序处理结果 {i} ---")
+            print(f"文本内容(前100字符): {result['text_content'][:100]}...")
+            print(f"重排序分数: {result.get('rerank_score', 'N/A')}")
+            print(f"原始索引: {result.get('original_index', 'N/A')}")
+            print(f"重排序排名: {result.get('rerank_rank', 'N/A')}")
+            print(f"映射的元数据: {result.get('metadata', {})}")
+
+            # 验证映射是否正确
+            orig_idx = result.get('original_index', 0)
+            if orig_idx < len(hybrid_results):
+                expected_content = hybrid_results[orig_idx]['text_content'][:100]
+                actual_content = result['text_content'][:100]
+                is_match = expected_content == actual_content
+                print(f"内容匹配验证: {'正确' if is_match else '错误'}")
+                if not is_match:
+                    print(f"   期望内容: {expected_content}...")
+                    print(f"   实际内容: {actual_content}...")
+            else:
+                print(f"索引越界: original_index={orig_idx} >= hybrid_results长度={len(hybrid_results)}")
+
+    except Exception as e:
+        print(f"[ERROR] 调试失败: {str(e)}")
+        import traceback
+        traceback.print_exc()
+
+
+def main():
+    """
+    主调试函数
+    """
+    print("开始RAG元数据映射调试")
+
+    # 简化查询,更容易观察映射关系
+    query = "水泥混凝土路面"
+    collection_name = "first_bfp_collection"
+
+    debug_multi_stage_recall(collection_name, query)
+
+
+if __name__ == "__main__":
+    main()

+ 1 - 1
foundation/ai/models/rerank_model.py

@@ -42,7 +42,7 @@ class LqReranker:
                 top_k = self.top_k
             
 
-            server_logger.info(f"开始执行重排序,查询: {query}, 候选文档数量: {len(candidates)}")
+            server_logger.info(f"开始执行重排序,查询:, 候选文档数量: {len(candidates)}")
 
             # 构建重排序请求
             rerank_request = {

+ 42 - 14
foundation/ai/rag/retrieval/retrieval.py

@@ -39,7 +39,7 @@ class RetrievalManager:
             List[Dict]: 搜索结果列表
         """
         try:
-            self.logger.info(f"开始混合搜索召回,查询: {query_text}")
+            self.logger.info(f"开始混合检索")
 
             param = {'collection_name': collection_name}
             results = self.vector_manager.hybrid_search(
@@ -69,7 +69,7 @@ class RetrievalManager:
             top_k: 返回结果数量
 
         Returns:
-            List[Dict]: 重排序后的结果列表
+            List[Dict]: 重排序后的结果列表,包含原始索引信息
         """
         try:
             self.logger.info(f"开始重排序召回,候选文档数量: {len(candidates)}")
@@ -77,14 +77,30 @@ class RetrievalManager:
             # 调用重排序执行器
             rerank_results = rerank_model.bge_rerank(query_text, candidates, top_k)
 
-            # 转换结果格式
+            # 转换结果格式,通过文本匹配找到正确的原始索引
             scored_docs = []
-            for i, result in enumerate(rerank_results):
+            for i, api_result in enumerate(rerank_results):
+                rerank_text = api_result.get('text', '')
+                rerank_score = float(api_result.get('score', '0.0'))
+
+                # 通过文本匹配找到原始在candidates中的索引
+                original_index = None
+                for j, candidate_text in enumerate(candidates):
+                    if candidate_text == rerank_text:
+                        original_index = j
+                        break
+
+                if original_index is None:
+                    self.logger.warning(f"无法找到重排序结果的原始索引,文本: {rerank_text[:50]}...")
+                    original_index = i  # 回退到当前索引
+
                 scored_docs.append({
-                    'text_content': result.get('text', ''),
-                    'rerank_score': float(result.get('score', '0.0')),
-                    'index': i
+                    'text_content': rerank_text,
+                    'rerank_score': rerank_score,
+                    'original_index': original_index,  # 正确的原始索引
+                    'rerank_rank': i  # 重排序后的排名
                 })
+                self.logger.debug(f"重排序结果 {i}: 原始索引={original_index}, 重排序分数={rerank_score}")
 
             self.logger.info(f"重排序召回返回 {len(scored_docs)} 个结果")
             return scored_docs
@@ -94,7 +110,7 @@ class RetrievalManager:
             return []
 
     def multi_stage_recall(self, collection_name: str, query_text: str,
-                          hybrid_top_k: int = 50, top_k: int = 10,
+                          hybrid_top_k: int = 50, top_k: int = 3,
                           ranker_type: str = "weighted") -> List[Dict[str, Any]]:
         """
         多路召回 - 先混合搜索召回,再重排序,只返回重排序结果
@@ -110,7 +126,7 @@ class RetrievalManager:
             List[Dict]: 重排序后的结果列表,只包含重排序分数
         """
         try:
-            self.logger.info(f"开始多路召回,查询: {query_text}")
+            self.logger.info(f"执行多路召回")
 
             # 第一阶段:混合搜索召回(向量+BM25)
             hybrid_results = self.hybrid_search_recall(
@@ -134,15 +150,27 @@ class RetrievalManager:
                 top_k=top_k
             )
 
-            # 为重排序结果添加混合搜索的原始元数据
+            # 为重排序结果添加混合搜索的原始元数据,只保留text_content和metadata
+            final_results = []
             for rerank_result in rerank_results:
-                original_index = rerank_result.get('index', 0)
+                # 使用正确的原始索引进行元数据映射
+                original_index = rerank_result.get('original_index', 0)
                 if original_index < len(hybrid_results):
                     original_metadata = hybrid_results[original_index].get('metadata', {})
-                    rerank_result['metadata'] = original_metadata
 
-            self.logger.info(f"多路召回完成,返回 {len(rerank_results)} 个重排序结果")
-            return rerank_results
+                    # 只输出text_content和metadata
+                    final_result = {
+                        'text_content': rerank_result['text_content'],
+                        'metadata': original_metadata
+                    }
+                    final_results.append(final_result)
+
+                    self.logger.debug(f"元数据映射成功: 重排序排名{rerank_result.get('rerank_rank')} -> 原始索引{original_index}")
+                else:
+                    self.logger.warning(f"元数据映射失败: 原始索引{original_index}超出范围(0-{len(hybrid_results)-1})")
+
+            self.logger.info(f"多路召回完成,返回 {len(final_results)} 个重排序结果")
+            return final_results
 
         except Exception as e:
             self.logger.error(f"多路召回失败: {str(e)}")

+ 0 - 245
test/test_hybrid_v2.6.py

@@ -1,245 +0,0 @@
-#!/usr/bin/env python3
-"""
-测试 Milvus v2.6 混合搜索功能
-"""
-
-import sys
-import os
-
-# 添加项目根目录到路径
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-print("Milvus v2.6 混合搜索测试")
-print("=" * 50)
-
-def test_hybrid_search_v26():
-    """测试 v2.6 混合搜索功能"""
-
-    try:
-        # 检查版本
-        import pymilvus
-        print(f"PyMilvus 版本: {pymilvus.__version__}")
-
-        # 连接服务器并检查版本
-        from pymilvus import connections, utility
-        connections.connect(
-            alias="default",
-            host='192.168.92.61',
-            port='19530',
-            db_name="lq_db"
-        )
-
-        try:
-            server_version = utility.get_server_version()
-            print(f"Milvus 服务器版本: {server_version}")
-        except Exception as e:
-            print(f"获取服务器版本失败: {e}")
-
-        # 导入必要组件
-        from langchain_milvus import Milvus, BM25BuiltInFunction
-        from langchain_core.documents import Document
-        from foundation.ai.models.model_handler import model_handler
-
-        print("✓ 导入成功")
-
-        # 获取嵌入模型
-        emdmodel = model_handler._get_lq_qwen3_8b_emd()
-        print("✓ 嵌入模型加载成功")
-
-        # 创建测试文档
-        test_docs = [
-            Document(
-                page_content="四川路桥建设集团专注于桥梁和隧道工程建设",
-                metadata={"category": "company", "type": "construction"}
-            ),
-            Document(
-                page_content="高速公路桥梁建设技术包括预应力混凝土和钢结构",
-                metadata={"category": "technology", "type": "highway"}
-            ),
-            Document(
-                page_content="隧道工程施工方法包括盾构法和钻爆法",
-                metadata={"category": "method", "type": "tunnel"}
-            ),
-            Document(
-                page_content="人工智能在建筑行业应用于智能监控和自动化施工",
-                metadata={"category": "ai", "type": "technology"}
-            ),
-            Document(
-                page_content="BIM技术在路桥工程中的数字化应用越来越普及",
-                metadata={"category": "bim", "type": "digital"}
-            )
-        ]
-
-        print(f"✓ 创建 {len(test_docs)} 个测试文档")
-
-        # 连接参数
-        connection_args = {
-            "uri": "http://192.168.92.61:19530",
-            "user": None,
-            "db_name": "lq_db"
-        }
-
-        collection_name = "test_hybrid_v26"
-
-        print("\n🚀 创建混合搜索向量存储...")
-        vectorstore = Milvus.from_documents(
-            documents=test_docs,
-            embedding=emdmodel,
-            builtin_function=BM25BuiltInFunction(),
-            vector_field=["dense", "sparse"],
-            connection_args=connection_args,
-            collection_name=collection_name,
-            consistency_level="Strong",
-            drop_old=True,
-        )
-        print("✅ 混合搜索向量存储创建成功!")
-
-        # 测试不同的搜索策略
-        print("\n🔍 测试混合搜索功能...")
-
-        # 1. 加权搜索
-        print("\n1. 加权搜索 (dense=0.7, sparse=0.3):")
-        results = vectorstore.similarity_search(
-            query="桥梁建设技术",
-            k=3,
-            ranker_type="weighted",
-            ranker_params={"weights": [0.7, 0.3]}
-        )
-        print(f"   找到 {len(results)} 个结果:")
-        for i, result in enumerate(results):
-            content = result.page_content[:50]
-            category = result.metadata.get('category', 'N/A')
-            print(f"   {i+1}. {content}... (类别: {category})")
-
-        # 2. RRF 搜索
-        print("\n2. RRF 搜索:")
-        rrf_results = vectorstore.similarity_search(
-            query="人工智能应用",
-            k=2,
-            ranker_type="rrf",
-            ranker_params={"k": 60}
-        )
-        print(f"   找到 {len(rrf_results)} 个结果:")
-        for i, result in enumerate(rrf_results):
-            content = result.page_content[:50]
-            print(f"   {i+1}. {content}...")
-
-        # 3. 默认搜索
-        print("\n3. 默认搜索:")
-        default_results = vectorstore.similarity_search(
-            query="BIM技术应用",
-            k=2
-        )
-        print(f"   找到 {len(default_results)} 个结果:")
-        for i, result in enumerate(default_results):
-            content = result.page_content[:50]
-            print(f"   {i+1}. {content}...")
-
-        # # 清理
-        # if utility.has_collection(collection_name):
-        #     utility.drop_collection(collection_name)
-        #     print(f"\n✅ 清理测试集合: {collection_name}")
-
-        return True
-
-    except Exception as e:
-        print(f"❌ 测试失败: {e}")
-        import traceback
-        traceback.print_exc()
-        return False
-
-def test_advanced_hybrid_features():
-    """测试高级混合搜索功能"""
-
-    try:
-        print("\n🎯 测试高级混合搜索功能...")
-
-        from langchain_milvus import Milvus, BM25BuiltInFunction
-        from langchain_core.documents import Document
-        from foundation.ai.models.model_handler import model_handler
-
-        emdmodel = model_handler._get_lq_qwen3_8b_emd()
-
-        # 测试多种权重配置
-        docs = [
-            Document(page_content="深度学习技术在图像识别中的应用", metadata={"domain": "ai", "type": "dl"}),
-            Document(page_content="机器学习算法在数据挖掘中的实践", metadata={"domain": "ai", "type": "ml"}),
-            Document(page_content="神经网络模型的优化方法研究", metadata={"domain": "ai", "type": "nn"}),
-        ]
-
-        connection_args = {
-            "uri": "http://192.168.92.61:19530",
-            "user": None,
-            "db_name": "lq_db"
-        }
-
-        collection_name = "test_advanced_hybrid"
-
-        # 创建向量存储
-        vectorstore = Milvus.from_documents(
-            documents=docs,
-            embedding=emdmodel,
-            builtin_function=BM25BuiltInFunction(),
-            vector_field=["dense", "sparse"],
-            connection_args=connection_args,
-            collection_name=collection_name,
-            consistency_level="Strong",
-            drop_old=True,
-        )
-
-        print("✅ 高级混合搜索测试集创建成功")
-
-        # 测试不同的权重组合
-        test_configs = [
-            {"name": "语义优先", "weights": [0.9, 0.1]},
-            {"name": "关键词优先", "weights": [0.1, 0.9]},
-            {"name": "平衡配置", "weights": [0.5, 0.5]},
-        ]
-
-        for config in test_configs:
-            results = vectorstore.similarity_search(
-                query="深度学习模型",
-                k=2,
-                ranker_type="weighted",
-                ranker_params={"weights": config["weights"]}
-            )
-            print(f"   {config['name']} ({config['weights']}): {len(results)} 个结果")
-
-        # 清理
-        from pymilvus import utility
-        if utility.has_collection(collection_name):
-            utility.drop_collection(collection_name)
-
-        return True
-
-    except Exception as e:
-        print(f"❌ 高级功能测试失败: {e}")
-        return False
-
-if __name__ == "__main__":
-    print("开始 Milvus v2.6 混合搜索测试...")
-
-    # 基础混合搜索测试
-    basic_success = test_hybrid_search_v26()
-
-    # 高级功能测试
-    if basic_success:
-        advanced_success = test_advanced_hybrid_features()
-    else:
-        advanced_success = False
-
-    print("\n" + "=" * 50)
-    print("测试结果总结:")
-    print(f"✅ 基础混合搜索: {'成功' if basic_success else '失败'}")
-    print(f"✅ 高级混合搜索: {'成功' if advanced_success else '失败'}")
-
-    if basic_success and advanced_success:
-        print("\n🎉 恭喜!Milvus v2.6 混合搜索功能完全正常!")
-        print("\n📝 可以在你的项目中使用以下功能:")
-        print("- ✓ create_hybrid_collection() 方法")
-        print("- ✓ hybrid_search() 方法")
-        print("- ✓ 加权搜索 (ranker_type='weighted')")
-        print("- ✓ RRF 搜索 (ranker_type='rrf')")
-        print("- ✓ 自定义权重配置")
-    else:
-        print("\n❌ 仍有问题需要解决")

+ 0 - 212
test/test_milvus_methods.py

@@ -1,212 +0,0 @@
-#!/usr/bin/env python3
-"""
-直接测试 milvus_vector.py 中的 create_hybrid_collection 和 hybrid_search 方法
-"""
-
-import sys
-import os
-
-# 添加项目根目录到路径
-sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-print("测试 MilvusVectorManager 的 create_hybrid_collection 和 hybrid_search 方法")
-print("=" * 70)
-
-def test_milvus_vector_manager_methods():
-    """直接测试 MilvusVectorManager 类的方法"""
-
-    try:
-        # 导入并初始化 MilvusVectorManager
-        from foundation.database.base.vector.milvus_vector import MilvusVectorManager
-        print("✓ 成功导入 MilvusVectorManager")
-
-        # 初始化管理器
-        manager = MilvusVectorManager()
-        print("✓ MilvusVectorManager 初始化成功")
-
-        # 测试数据
-        test_documents = [
-            {
-                'content': '四川路桥建设集团专注于桥梁和隧道工程建设',
-                'metadata': {'category': 'company', 'industry': 'construction', 'id': 1}
-            },
-            {
-                'content': '高速公路桥梁建设技术包括预应力混凝土桥梁和钢结构桥梁',
-                'metadata': {'category': 'technology', 'industry': 'highway', 'id': 2}
-            },
-            {
-                'content': '隧道工程施工方法包括盾构法、钻爆法和明挖法',
-                'metadata': {'category': 'method', 'industry': 'tunnel', 'id': 3}
-            },
-            {
-                'content': '人工智能在建筑行业的应用包括智能监控和自动化施工',
-                'metadata': {'category': 'ai', 'industry': 'technology', 'id': 4}
-            },
-            {
-                'content': 'BIM技术在路桥工程中的数字化应用越来越普及',
-                'metadata': {'category': 'bim', 'industry': 'digital', 'id': 5}
-            }
-        ]
-
-        collection_name = "test_milvus_methods"
-
-        print(f"\n🚀 测试 create_hybrid_collection 方法...")
-        print(f"   准备创建集合: {collection_name}")
-        print(f"   文档数量: {len(test_documents)}")
-
-        # 调用 create_hybrid_collection 方法
-        vectorstore = manager.create_hybrid_collection(
-            collection_name=collection_name,
-            documents=test_documents
-        )
-        print("✅ create_hybrid_collection 执行成功!")
-        print(f"   返回的 vectorstore 类型: {type(vectorstore)}")
-
-        # 等待索引创建完成
-        import time
-        time.sleep(3)
-
-        print(f"\n🔍 测试 hybrid_search 方法...")
-
-        # 测试参数
-        param = {'collection_name': collection_name}
-
-        # 1. 测试加权搜索
-        print("\n   1. 测试加权混合搜索:")
-        query1 = "桥梁建设技术"
-        print(f"   查询: '{query1}'")
-        results1 = manager.hybrid_search(
-            param=param,
-            query_text=query1,
-            top_k=3,
-            ranker_type="weighted",
-            dense_weight=0.7,
-            sparse_weight=0.3
-        )
-        print(f"   找到 {len(results1)} 个结果:")
-        for i, result in enumerate(results1):
-            content = result.get('text_content', '')[:50]
-            similarity = result.get('similarity', 0)
-            metadata = result.get('metadata', {})
-            print(f"   {i+1}. {content}... (相似度: {similarity:.4f})")
-            print(f"      元数据: {metadata}")
-
-        # 2. 测试RRF搜索
-        print("\n   2. 测试RRF混合搜索:")
-        query2 = "人工智能应用"
-        print(f"   查询: '{query2}'")
-        results2 = manager.hybrid_search(
-            param=param,
-            query_text=query2,
-            top_k=2,
-            ranker_type="rrf"
-        )
-        print(f"   找到 {len(results2)} 个结果:")
-        for i, result in enumerate(results2):
-            content = result.get('text_content', '')
-            metadata = result.get('metadata', {})
-            print(f"   {i+1}. {content}")
-            print(f"      元数据: {metadata}")
-
-        # 3. 测试不同权重配置
-        print("\n   3. 测试不同权重配置:")
-        query3 = "路桥工程"
-        weight_configs = [
-            {"dense": 0.8, "sparse": 0.2, "name": "语义优先"},
-            {"dense": 0.2, "sparse": 0.8, "name": "关键词优先"},
-            {"dense": 0.5, "sparse": 0.5, "name": "平衡配置"}
-        ]
-
-        for config in weight_configs:
-            print(f"   {config['name']} (dense={config['dense']}, sparse={config['sparse']}):")
-            results3 = manager.hybrid_search(
-                param=param,
-                query_text=query3,
-                top_k=2,
-                ranker_type="weighted",
-                dense_weight=config["dense"],
-                sparse_weight=config["sparse"]
-            )
-            print(f"     返回 {len(results3)} 个结果")
-            if results3:
-                best_content = results3[0].get('text_content', '')[:50]
-                print(f"     最佳匹配: {best_content}...")
-
-        # 清理测试集合
-        print(f"\n🧹 清理测试集合...")
-        try:
-            from pymilvus import utility
-            if utility.has_collection(collection_name):
-                utility.drop_collection(collection_name)
-                print(f"✅ 成功清理集合: {collection_name}")
-        except Exception as e:
-            print(f"⚠️ 清理集合失败: {e}")
-
-        return True
-
-    except Exception as e:
-        print(f"❌ 测试失败: {e}")
-        import traceback
-        traceback.print_exc()
-        return False
-
-def test_method_signatures():
-    """测试方法签名和基本功能"""
-
-    try:
-        from foundation.database.base.vector.milvus_vector import MilvusVectorManager
-
-        print("\n📋 方法签名检查:")
-
-        # 检查 create_hybrid_collection 方法
-        import inspect
-        create_sig = inspect.signature(MilvusVectorManager.create_hybrid_collection)
-        print(f"   create_hybrid_collection{create_sig}")
-
-        # 检查 hybrid_search 方法
-        hybrid_sig = inspect.signature(MilvusVectorManager.hybrid_search)
-        print(f"   hybrid_search{hybrid_sig}")
-
-        # 检查方法是否存在
-        methods = dir(MilvusVectorManager())
-        has_create = 'create_hybrid_collection' in methods
-        has_hybrid = 'hybrid_search' in methods
-
-        print(f"\n   方法存在性检查:")
-        print(f"   create_hybrid_collection: {'✓' if has_create else '✗'}")
-        print(f"   hybrid_search: {'✓' if has_hybrid else '✗'}")
-
-        return has_create and has_hybrid
-
-    except Exception as e:
-        print(f"❌ 方法签名检查失败: {e}")
-        return False
-
-if __name__ == "__main__":
-    print("开始测试 MilvusVectorManager 的核心方法...")
-
-    # 方法签名检查
-    signature_ok = test_method_signatures()
-
-    if signature_ok:
-        # 核心功能测试
-        function_ok = test_milvus_vector_manager_methods()
-    else:
-        function_ok = False
-
-    print("\n" + "=" * 70)
-    print("测试结果总结:")
-    print(f"✅ 方法签名检查: {'通过' if signature_ok else '失败'}")
-    print(f"✅ 核心功能测试: {'通过' if function_ok else '失败'}")
-
-    if signature_ok and function_ok:
-        print("\n🎉 所有测试通过!")
-        print("\n📝 MilvusVectorManager 核心方法完全可用:")
-        print("   ✓ create_hybrid_collection() - 混合集合创建")
-        print("   ✓ hybrid_search() - 混合搜索")
-        print("   ✓ 加权搜索 (ranker_type='weighted')")
-        print("   ✓ RRF搜索 (ranker_type='rrf')")
-        print("   ✓ 自定义权重配置")
-        print("   ✓ 完整的错误处理和回退机制")
-    else:
-        print("\n❌ 部分测试失败,请检查实现")

File diff suppressed because it is too large
+ 49 - 0
test/test_rag.py


+ 0 - 93
test/test_rerank_simple.py

@@ -1,93 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-"""
-简化的重排序测试脚本
-"""
-
-import sys
-import os
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
-
-from foundation.ai.models.rerank_model import execute_rerank
-
-def test_rerank_function():
-    """
-    测试重排序功能
-    """
-    print("开始测试重排序功能")
-    print("=" * 50)
-
-    # 测试用例1:乔布斯查询
-    query1 = "乔布斯是谁?"
-    candidates1 = [
-        "大模型是一类具有大量参数的人工智能模型。",
-        "苹果是一家科技公司",
-        "大模型用于深度学习任务"
-    ]
-
-    print("测试用例1:")
-    print(f"查询: {query1}")
-    print("候选文档:")
-    for i, doc in enumerate(candidates1, 1):
-        print(f"  {i}. {doc}")
-
-    try:
-        results1 = execute_rerank(query1, candidates1, top_k=3)
-
-        print("\n重排序结果:")
-        for i, result in enumerate(results1, 1):
-            text = result.get("text", "")
-            score = result.get("score", "0.0")
-            print(f"  {i}. [{score}] {text}")
-
-        print(f"\n结果数量: {len(results1)}")
-        if results1:
-            print("测试用例1: 成功")
-        else:
-            print("测试用例1: 失败 - 没有返回结果")
-
-    except Exception as e:
-        print(f"测试用例1: 失败 - 异常: {str(e)}")
-
-    print("\n" + "-" * 50)
-
-    # 测试用例2:人工智能查询
-    query2 = "什么是人工智能?"
-    candidates2 = [
-        "人工智能是计算机科学的一个分支。",
-        "机器学习是人工智能的核心技术。",
-        "深度学习使用神经网络进行学习。",
-        "苹果公司生产iPhone手机。",
-        "Transformer是大模型的基础架构。"
-    ]
-
-    print("测试用例2:")
-    print(f"查询: {query2}")
-    print("候选文档:")
-    for i, doc in enumerate(candidates2, 1):
-        print(f"  {i}. {doc}")
-
-    try:
-        results2 = execute_rerank(query2, candidates2, top_k=5)
-
-        print("\n重排序结果:")
-        for i, result in enumerate(results2, 1):
-            text = result.get("text", "")
-            score = result.get("score", "0.0")
-            print(f"  {i}. [{score}] {text}")
-
-        print(f"\n结果数量: {len(results2)}")
-        if results2:
-            print("测试用例2: 成功")
-        else:
-            print("测试用例2: 失败 - 没有返回结果")
-
-    except Exception as e:
-        print(f"测试用例2: 失败 - 异常: {str(e)}")
-
-    print("\n" + "=" * 50)
-    print("重排序功能测试完成")
-
-if __name__ == "__main__":
-    test_rerank_function()

Some files were not shown because too many files changed in this diff