""" Milvus 搜索对比测试报告生成器 生成 Markdown 格式的对比报告,包含 BM25 全文检索和混合搜索的对比 """ from __future__ import annotations import os import sys from typing import Any, Dict, List from datetime import datetime import math # 将项目根目录加入 sys.path,确保能导入 foundation 等模块 BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, BASE_DIR) from pymilvus import MilvusClient, AnnSearchRequest, WeightedRanker from foundation.infrastructure.config.config import config_handler from foundation.ai.models.model_handler import model_handler # Collection 名称 CHILD_COLLECTION_NAME = "t_rag_kng_standard3" # 所有字段列表(排除 sparse,因为稀疏向量不能作为输出字段检索) ALL_FIELDS = [ "pk", "text", "dense", "document_id", "parent_id", "index", "tag_list", "permission", "metadata", "is_deleted", "created_by", "created_time", "updated_by", "updated_time", ] _MILVUS_CLIENT = None _EMBEDDINGS = None def get_milvusclient() -> MilvusClient: """懒加载 MilvusClient,避免重复初始化。""" global _MILVUS_CLIENT if _MILVUS_CLIENT is None: host = config_handler.get("milvus", "MILVUS_HOST", "localhost") port = int(config_handler.get("milvus", "MILVUS_PORT", "19530")) user = config_handler.get("milvus", "MILVUS_USER", "") password = config_handler.get("milvus", "MILVUS_PASSWORD", "") uri = f"http://{host}:{port}" conn_args = {"uri": uri, "db_name": "lq_db"} if user: conn_args["user"] = user if password: conn_args["password"] = password _MILVUS_CLIENT = MilvusClient(**conn_args) return _MILVUS_CLIENT def get_embeddings_model(): """懒加载 Embeddings,避免重复初始化。""" global _EMBEDDINGS if _EMBEDDINGS is None: _EMBEDDINGS = model_handler.get_embedding_model() return _EMBEDDINGS def search_by_bm25( query_text: str, collection_name: str = CHILD_COLLECTION_NAME, top_k: int = 3 ) -> List[Dict[str, Any]]: """ BM25 全文检索 返回的相似度为 Milvus 原生计算值,真实反映搜索匹配度 """ client = get_milvusclient() results = client.search( collection_name=collection_name, data=[query_text], anns_field="sparse", metric_type="BM25", limit=top_k, output_fields=ALL_FIELDS ) return format_results(results, top_k=top_k, search_type="bm25") def hybrid_search( query_text: str, collection_name: str = CHILD_COLLECTION_NAME, top_k: int = 3 ) -> List[Dict[str, Any]]: """ 混合搜索(Dense + Sparse) 参考 Milvus 多向量混合搜索: https://milvus.io/docs/zh/multi-vector-search.md 使用 WeightedRanker 对密集向量和稀疏向量搜索结果进行加权融合 """ client = get_milvusclient() embeddings = get_embeddings_model() query_vector = embeddings.embed_query(query_text) # 创建密集向量搜索请求 (语义相似度) dense_req = AnnSearchRequest( data=[query_vector], anns_field="dense", param={"metric_type": "COSINE"}, limit=top_k * 2 ) # 创建稀疏向量搜索请求 (BM25 关键词匹配) sparse_req = AnnSearchRequest( data=[query_text], anns_field="sparse", param={"metric_type": "BM25"}, limit=top_k * 2 ) # 使用加权排序器,平衡语义相似度和关键词匹配 # weights: [dense权重, sparse权重] ranker = WeightedRanker(0.5, 0.5) results = client.hybrid_search( collection_name=collection_name, reqs=[dense_req, sparse_req], ranker=ranker, limit=top_k, output_fields=ALL_FIELDS ) return format_results(results, top_k=top_k, search_type="hybrid") def cosine_similarity(vec1: List[float], vec2: List[float]) -> float: """计算余弦相似度,返回范围约为 [-1, 1]。""" if not vec1 or not vec2 or len(vec1) != len(vec2): return 0.0 dot = sum(a * b for a, b in zip(vec1, vec2)) norm1 = math.sqrt(sum(a * a for a in vec1)) norm2 = math.sqrt(sum(b * b for b in vec2)) if norm1 == 0 or norm2 == 0: return 0.0 return dot / (norm1 * norm2) def compute_query_text_similarity(query_text: str, content_text: str) -> float: """ 统一相似度定义: 使用同一向量模型计算 query 与召回文本的语义相似度(cosine)。 """ if not content_text: return 0.0 embeddings = get_embeddings_model() query_vec = embeddings.embed_query(query_text) content_vec = embeddings.embed_query(content_text) return cosine_similarity(query_vec, content_vec) def normalize_similarity_score(similarity: float) -> float: """将 [-1,1] 映射到 [0,1],便于在报告中直接对比。""" normalized = (similarity + 1.0) / 2.0 if normalized < 0: return 0.0 if normalized > 1: return 1.0 return normalized def enrich_with_unified_similarity(query_text: str, results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """为每条检索结果补充可比较的统一相似度字段。""" enriched = [] for result in results: text = result.get("text", "") or "" semantic_similarity = compute_query_text_similarity(query_text, text) normalized_similarity = normalize_similarity_score(semantic_similarity) new_item = dict(result) new_item["semantic_similarity"] = semantic_similarity new_item["normalized_similarity"] = normalized_similarity enriched.append(new_item) return enriched def format_results(results, top_k: int = 3, search_type: str = "bm25") -> List[Dict[str, Any]]: """ 格式化搜索结果 ✅ 修复:直接使用 Milvus 原生返回的 distance(真实相似度) 返回相似度最高的前 top_k 个结果 """ formatted_results = [] for result_group in results: for item in result_group: entity = item.get("entity", {}) # ✅ 关键修复:保留 Milvus 计算的真实相似度分数 formatted_item = { "id": item.get("id"), "distance": item.get("distance", 0.0), # 原生搜索分数 "search_type": search_type, # 标记搜索类型 } formatted_item.update(entity) formatted_results.append(formatted_item) # 按相似度降序排序,确保 Top1 最高 formatted_results = sorted(formatted_results, key=lambda x: x["distance"], reverse=True) # 只返回前 top_k 个结果 return formatted_results[:top_k] def compare_search_results( bm25_results: List[Dict[str, Any]], hybrid_results: List[Dict[str, Any]] ) -> Dict[str, Any]: """ 对比两种搜索方式的结果 返回归一化后的相似度对比、排名对比、重叠度分析等 """ comparison = { "bm25": [], "hybrid": [], "overlap_analysis": {}, "ranking_comparison": {}, "score_comparison": [], "method_summary": {} } # 1. BM25 结果(已包含统一相似度) for result in bm25_results: comparison["bm25"].append({ "id": result.get('id'), "retrieval_score": result.get('distance', 0.0), "semantic_similarity": result.get('semantic_similarity', 0.0), "normalized_similarity": result.get('normalized_similarity', 0.0), "text": result.get('text', ''), "document_id": result.get('document_id', 'N/A') }) # 2. 混合搜索结果 for result in hybrid_results: comparison["hybrid"].append({ "id": result.get('id'), "retrieval_score": result.get('distance', 0.0), "semantic_similarity": result.get('semantic_similarity', 0.0), "normalized_similarity": result.get('normalized_similarity', 0.0), "text": result.get('text', ''), "document_id": result.get('document_id', 'N/A') }) # 3. 重叠度分析 bm25_ids = {r['id'] for r in comparison["bm25"]} hybrid_ids = {r['id'] for r in comparison["hybrid"]} common_ids = bm25_ids & hybrid_ids bm25_only = bm25_ids - hybrid_ids hybrid_only = hybrid_ids - bm25_ids comparison["overlap_analysis"] = { "common_count": len(common_ids), "bm25_only_count": len(bm25_only), "hybrid_only_count": len(hybrid_only), "overlap_rate": len(common_ids) / max(len(bm25_ids), 1), "common_ids": list(common_ids), "bm25_only_ids": list(bm25_only), "hybrid_only_ids": list(hybrid_only) } # 4. 排名对比(针对共同结果) ranking_comparison = [] for doc_id in common_ids: bm25_rank = next((i for i, r in enumerate(comparison["bm25"]) if r['id'] == doc_id), -1) hybrid_rank = next((i for i, r in enumerate(comparison["hybrid"]) if r['id'] == doc_id), -1) ranking_comparison.append({ "id": doc_id, "bm25_rank": bm25_rank + 1, "hybrid_rank": hybrid_rank + 1, "rank_diff": abs(bm25_rank - hybrid_rank) }) comparison["ranking_comparison"] = ranking_comparison # 5. 分数对比(针对共同结果) score_comparison = [] for doc_id in common_ids: bm25_result = next((r for r in comparison["bm25"] if r['id'] == doc_id), None) hybrid_result = next((r for r in comparison["hybrid"] if r['id'] == doc_id), None) if bm25_result and hybrid_result: score_comparison.append({ "id": doc_id, "bm25_similarity": bm25_result['normalized_similarity'], "hybrid_similarity": hybrid_result['normalized_similarity'], "score_diff": abs(bm25_result['normalized_similarity'] - hybrid_result['normalized_similarity']) }) comparison["score_comparison"] = score_comparison # 6. 方法级别汇总(用于 case 结论) bm25_top1 = comparison["bm25"][0]["normalized_similarity"] if comparison["bm25"] else 0.0 hybrid_top1 = comparison["hybrid"][0]["normalized_similarity"] if comparison["hybrid"] else 0.0 bm25_avg = ( sum(item["normalized_similarity"] for item in comparison["bm25"]) / len(comparison["bm25"]) if comparison["bm25"] else 0.0 ) hybrid_avg = ( sum(item["normalized_similarity"] for item in comparison["hybrid"]) / len(comparison["hybrid"]) if comparison["hybrid"] else 0.0 ) comparison["method_summary"] = { "bm25_top1": bm25_top1, "hybrid_top1": hybrid_top1, "bm25_avg_top3": bm25_avg, "hybrid_avg_top3": hybrid_avg, } return comparison def generate_markdown_report(test_queries: List[str]) -> str: """ 生成 Markdown 格式的对比报告 Args: test_queries: 测试查询列表 Returns: Markdown 格式的报告字符串 """ report_lines = [] # 报告标题 report_lines.append("# Milvus 搜索方式对比测试报告") report_lines.append("") report_lines.append(f"**测试时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") report_lines.append(f"**测试 Collection**: `{CHILD_COLLECTION_NAME}`") report_lines.append(f"**测试查询数**: {len(test_queries)}") report_lines.append("") # 目录 report_lines.append("## 目录") report_lines.append("") report_lines.append("1. [测试概述](#1-测试概述)") for i, query in enumerate(test_queries, 1): safe_query = query.replace(" ", "-").replace("/", "-").replace("\\", "-") report_lines.append(f"2. [Case {i}: {query}](#2-case-{i}-{safe_query})") report_lines.append(f"{len(test_queries) + 2}. [结论与建议](#{len(test_queries) + 2}-结论与建议)") report_lines.append("") # 1. 测试概述 report_lines.append("## 1. 测试概述") report_lines.append("") report_lines.append("### 1.1 测试目的") report_lines.append("") report_lines.append("对比 BM25 全文检索与混合搜索(Hybrid Search)在不同查询场景下的召回效果,评估两种搜索方式的优劣。") report_lines.append("") report_lines.append("### 1.2 搜索方式说明") report_lines.append("") report_lines.append("| 搜索方式 | 原理 | 适用场景 |") report_lines.append("|---------|------|---------|") report_lines.append("| **BM25 全文检索** | 基于中文分词器(jieba)进行关键词匹配,计算词频-逆文档频率 | 精确匹配关键词、专业术语搜索 |") report_lines.append("| **混合搜索** | 结合向量相似度(Dense)和 BM25(Sparse)加权融合 | 语义理解 + 关键词匹配的综合场景 |") report_lines.append("") report_lines.append("### 1.3 测试查询") report_lines.append("") report_lines.append("本次测试使用以下查询:") report_lines.append("") for i, query in enumerate(test_queries, 1): report_lines.append(f"{i}. `{query}`") report_lines.append("") case_summaries = [] # 2. 各 Case 详细对比 for case_idx, query in enumerate(test_queries, 1): report_lines.append(f"## 2. Case {case_idx}: `{query}`") report_lines.append("") # 执行搜索 try: bm25_results = search_by_bm25(query, top_k=3) hybrid_results = hybrid_search(query, top_k=3) bm25_results = enrich_with_unified_similarity(query, bm25_results) hybrid_results = enrich_with_unified_similarity(query, hybrid_results) except Exception as e: report_lines.append(f"⚠️ 搜索失败: {e}") report_lines.append("") continue # 2.1 查询分析 report_lines.append(f"### 2.{case_idx}.1 查询分析") report_lines.append("") report_lines.append(f"- **查询文本**: `{query}`") report_lines.append("") # 2.2 BM25 全文检索结果 report_lines.append(f"### 2.{case_idx}.2 BM25 全文检索结果(Top3 原文)") report_lines.append("") if bm25_results: for i, result in enumerate(bm25_results, 1): retrieval_score = result.get('distance', 0) normalized_similarity = result.get('normalized_similarity', 0) text = result.get('text', '') doc_id = result.get('document_id', 'N/A') doc_type = result.get('metadata', {}).get('document_type', 'N/A') report_lines.append( f"**Top {i}** (检索分数: {retrieval_score:.4f} | 统一相似度: {normalized_similarity:.4f})" ) report_lines.append("") report_lines.append(f"- **文档ID**: `{doc_id}`") report_lines.append(f"- **文档类型**: `{doc_type}`") report_lines.append(f"- **原文内容**:") report_lines.append("") report_lines.append("```text") report_lines.append(text if text else "(无文本内容)") report_lines.append("```") report_lines.append("") else: report_lines.append("⚠️ 未检索到结果") report_lines.append("") # 2.3 混合搜索结果 report_lines.append(f"### 2.{case_idx}.3 混合搜索结果(Top3 原文)") report_lines.append("") if hybrid_results: for i, result in enumerate(hybrid_results, 1): retrieval_score = result.get('distance', 0) normalized_similarity = result.get('normalized_similarity', 0) text = result.get('text', '') doc_id = result.get('document_id', 'N/A') doc_type = result.get('metadata', {}).get('document_type', 'N/A') report_lines.append( f"**Top {i}** (检索分数: {retrieval_score:.4f} | 统一相似度: {normalized_similarity:.4f})" ) report_lines.append("") report_lines.append(f"- **文档ID**: `{doc_id}`") report_lines.append(f"- **文档类型**: `{doc_type}`") report_lines.append(f"- **原文内容**:") report_lines.append("") report_lines.append("```text") report_lines.append(text if text else "(无文本内容)") report_lines.append("```") report_lines.append("") else: report_lines.append("⚠️ 未检索到结果") report_lines.append("") # 2.4 相似度对比分析 report_lines.append(f"### 2.{case_idx}.4 相似度对比分析") report_lines.append("") # 执行对比分析 comparison = compare_search_results(bm25_results, hybrid_results) # 2.4.1 重叠度分析 report_lines.append(f"#### 2.{case_idx}.4.1 重叠度分析") report_lines.append("") overlap = comparison["overlap_analysis"] report_lines.append("| 指标 | 数值 |") report_lines.append("|-----|-----|") report_lines.append(f"| BM25 召回数 | {len(bm25_results)} |") report_lines.append(f"| 混合搜索召回数 | {len(hybrid_results)} |") report_lines.append(f"| 共同召回数 | {overlap['common_count']} |") report_lines.append(f"| BM25 独有 | {overlap['bm25_only_count']} |") report_lines.append(f"| 混合搜索独有 | {overlap['hybrid_only_count']} |") report_lines.append(f"| 重叠率 | {overlap['overlap_rate'] * 100:.1f}% |") report_lines.append("") if overlap['common_ids']: report_lines.append("✅ **共同召回的文档**: 两种搜索方式都找到了以下文档") report_lines.append("") for doc_id in overlap['common_ids']: report_lines.append(f"- ID: `{doc_id}`") report_lines.append("") else: report_lines.append("⚠️ **结果差异**: 两种搜索方式召回的文档完全不同,说明查询语义和关键词匹配存在差异") report_lines.append("") # 2.4.2 归一化相似度对比 report_lines.append(f"#### 2.{case_idx}.4.2 统一相似度对比") report_lines.append("") report_lines.append("为便于可比,统一相似度定义为:查询词与召回原文的语义相似度(cosine),并映射到 0-1。") report_lines.append("") report_lines.append("| 排名 | 文档ID | BM25 统一相似度 | 混合检索统一相似度 | 差异 |") report_lines.append("|-----|--------|----------------|--------------------|------|") # 合并两种搜索的结果进行对比 all_ids = set(overlap['common_ids'] + overlap['bm25_only_ids'] + overlap['hybrid_only_ids']) for rank, doc_id in enumerate(all_ids, 1): bm25_info = next((r for r in comparison["bm25"] if r['id'] == doc_id), None) hybrid_info = next((r for r in comparison["hybrid"] if r['id'] == doc_id), None) bm25_norm = bm25_info['normalized_similarity'] if bm25_info else 'N/A' hybrid_norm = hybrid_info['normalized_similarity'] if hybrid_info else 'N/A' if bm25_norm != 'N/A' and hybrid_norm != 'N/A': score_diff = abs(bm25_norm - hybrid_norm) score_diff_str = f"{score_diff:.4f}" else: score_diff_str = 'N/A' report_lines.append( f"| {rank} | `{doc_id}` | " f"{f'{bm25_norm:.4f}' if bm25_norm != 'N/A' else 'N/A'} | " f"{f'{hybrid_norm:.4f}' if hybrid_norm != 'N/A' else 'N/A'} | " f"{score_diff_str} |" ) report_lines.append("") # 2.4.3 排名对比(针对共同结果) if comparison["ranking_comparison"]: report_lines.append(f"#### 2.{case_idx}.4.3 排名对比(共同结果)") report_lines.append("") report_lines.append("| 文档ID | BM25 排名 | 混合搜索排名 | 排名差异 |") report_lines.append("|--------|----------|-------------|---------|") for rank_comp in comparison["ranking_comparison"]: report_lines.append(f"| `{rank_comp['id']}` | Top{rank_comp['bm25_rank']} | Top{rank_comp['hybrid_rank']} | {rank_comp['rank_diff']} |") report_lines.append("") # 2.4.4 分析总结 report_lines.append(f"#### 2.{case_idx}.4.4 分析总结") report_lines.append("") method_summary = comparison["method_summary"] bm25_top1 = method_summary["bm25_top1"] hybrid_top1 = method_summary["hybrid_top1"] bm25_avg = method_summary["bm25_avg_top3"] hybrid_avg = method_summary["hybrid_avg_top3"] report_lines.append( f"- **Top1 统一相似度**: BM25={bm25_top1:.4f},混合检索={hybrid_top1:.4f}" ) report_lines.append( f"- **Top3 平均统一相似度**: BM25={bm25_avg:.4f},混合检索={hybrid_avg:.4f}" ) if hybrid_avg > bm25_avg: case_conclusion = "本 Case 中混合检索整体更优(按 Top3 平均统一相似度)。" elif hybrid_avg < bm25_avg: case_conclusion = "本 Case 中 BM25 全文检索整体更优(按 Top3 平均统一相似度)。" else: case_conclusion = "本 Case 中两种检索表现接近(按 Top3 平均统一相似度)。" report_lines.append(f"- **Case 结论**: {case_conclusion}") if overlap['overlap_rate'] >= 0.67: report_lines.append("- **高度一致**: 两种搜索方式召回结果高度重合,说明查询词在文档中有明确匹配") elif overlap['overlap_rate'] >= 0.33: report_lines.append("- **部分一致**: 两种搜索方式部分重合,混合搜索引入了语义相关的其他文档") else: report_lines.append("- **差异较大**: 两种搜索方式召回结果差异较大,混合搜索更侧重语义理解") report_lines.append("") case_summaries.append({ "query": query, "bm25_avg": bm25_avg, "hybrid_avg": hybrid_avg, "winner": "hybrid" if hybrid_avg > bm25_avg else ("bm25" if bm25_avg > hybrid_avg else "tie") }) # 3. 结论与建议 report_lines.append(f"## {len(test_queries) + 2}. 结论与建议") report_lines.append("") report_lines.append("### 结论") report_lines.append("") if case_summaries: hybrid_win = sum(1 for item in case_summaries if item["winner"] == "hybrid") bm25_win = sum(1 for item in case_summaries if item["winner"] == "bm25") tie_count = sum(1 for item in case_summaries if item["winner"] == "tie") global_bm25_avg = sum(item["bm25_avg"] for item in case_summaries) / len(case_summaries) global_hybrid_avg = sum(item["hybrid_avg"] for item in case_summaries) / len(case_summaries) report_lines.append( f"- 基于统一相似度(query-原文 cosine,映射到 0-1)," f"混合检索胜出 {hybrid_win} 个 case,BM25 胜出 {bm25_win} 个 case,平局 {tie_count} 个 case。" ) report_lines.append( f"- 全部 case 的 Top3 平均统一相似度:BM25={global_bm25_avg:.4f},混合检索={global_hybrid_avg:.4f}。" ) if global_hybrid_avg > global_bm25_avg: report_lines.append("- 综合结论:混合检索整体相关性更高,建议作为默认检索方式。") elif global_hybrid_avg < global_bm25_avg: report_lines.append("- 综合结论:BM25 在当前数据上整体相关性更高,建议优先用于检索。") else: report_lines.append("- 综合结论:两种检索整体表现接近,可根据场景动态选择。") else: report_lines.append("- 本次测试未获得有效 case 结果,无法形成统计性结论。") report_lines.append("") report_lines.append("### 建议") report_lines.append("") report_lines.append("- **生产环境推荐**:使用混合搜索作为默认搜索方式,兼顾精确性和语义理解") report_lines.append("- **专业检索场景**:提供 BM25 搜索选项,满足精确匹配需求") report_lines.append("- **结果融合**:可考虑根据查询特征动态选择搜索方式") report_lines.append("") report_lines.append("---") report_lines.append("") report_lines.append("*报告生成完成*") return "\n".join(report_lines) def main(): """主函数:生成对比测试报告""" # 测试查询列表(10个Case) test_queries = [ "水土保持规划的编制主体是什么?", "水土保持规划的批准流程是什么?", "生产建设活动中,针对水土流失预防和治理有哪些具体法定要求?", "县级以上人民政府水行政主管部门在水土保持监测方面承担哪些法定职责?", "在崩塌、滑坡危险区或泥石流易发区从事取土、挖砂、采石等活动,法律责任如何规定?", "JT/T 1499-2024 中,三级配电系统的组成、各级剩余电流动作保护器的额定动作电流与分断时间要求分别是什么?", "依据公路水运工程临时用电技术规程,TN-S 系统接地电阻要求有哪些?", "建筑机械使用安全技术规程中特种设备操作人员要求?", "环境与消防基本规定", "灌注桩冬期施工", ] print("=" * 60) print("开始生成 Milvus 搜索对比测试报告...") print("=" * 60) # 生成报告 report = generate_markdown_report(test_queries) # 保存报告 report_filename = f"search_comparison_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.md" report_dir = os.path.join(BASE_DIR, "utils_test", "RAG_Test", "reports") os.makedirs(report_dir, exist_ok=True) report_path = os.path.join(report_dir, report_filename) with open(report_path, "w", encoding="utf-8") as f: f.write(report) print(f"\n✅ 报告已生成: {report_path}") print(f"\n📊 测试查询数: {len(test_queries)}") print("\n测试查询列表:") for i, query in enumerate(test_queries, 1): print(f" {i}. {query}") # 同时打印报告内容预览 print("\n" + "=" * 60) print("报告预览(前 2000 字符):") print("=" * 60) print(report[:2000]) print("\n... (报告内容已截断)") if __name__ == "__main__": main()