#!/usr/bin/env python # -*- coding: utf-8 -*- """ 混合检索问题诊断脚本 用于排查 hybrid_search 返回0结果的问题 """ import sys import os sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from pymilvus import connections, Collection, utility from foundation.ai.models.model_handler import model_handler from foundation.observability.logger.loggering import review_logger as logger def check_milvus_connection(): """检查 Milvus 连接""" print("\n" + "="*60) print("1. 检查 Milvus 连接") print("="*60) try: from foundation.infrastructure.config.config import config_handler host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost') port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530')) connections.connect( alias="debug", host=host, port=port, db_name="lq_db" ) print(f"✅ Milvus 连接成功: {host}:{port}") return True except Exception as e: print(f"❌ Milvus 连接失败: {e}") return False def check_collection_exists(collection_name: str): """检查 Collection 是否存在""" print(f"\n2. 检查 Collection 是否存在: {collection_name}") print("-"*60) exists = utility.has_collection(collection_name, using="debug") if exists: print(f"✅ Collection '{collection_name}' 存在") else: print(f"❌ Collection '{collection_name}' 不存在!") return exists def check_collection_schema(collection_name: str): """检查 Collection Schema 结构""" print(f"\n3. 检查 Collection Schema 结构") print("-"*60) try: col = Collection(collection_name, using="debug") schema = col.schema print(f"Collection: {collection_name}") print(f"Description: {schema.description}") print(f"\n字段列表:") has_dense = False has_sparse = False field_names = [] for field in schema.fields: field_names.append(field.name) print(f" - {field.name}: {field.dtype.name}", end="") if hasattr(field, 'dim') and field.dim: print(f" (dim={field.dim})", end="") if field.is_primary: print(" [PRIMARY]", end="") print() # 检查关键字段 if field.name == "dense": has_dense = True if field.name == "sparse": has_sparse = True print(f"\n混合搜索所需字段检查:") print(f" - dense 字段: {'✅ 存在' if has_dense else '❌ 不存在'}") print(f" - sparse 字段: {'✅ 存在' if has_sparse else '❌ 不存在'}") if not has_dense or not has_sparse: print(f"\n⚠️ 警告: Collection 缺少混合搜索所需的字段!") print(f" 混合搜索需要 'dense' 和 'sparse' 两个字段") print(f" 当前字段: {field_names}") return has_dense and has_sparse except Exception as e: print(f"❌ 获取 Schema 失败: {e}") return False def check_collection_data(collection_name: str): """检查 Collection 数据量""" print(f"\n4. 检查 Collection 数据量") print("-"*60) try: col = Collection(collection_name, using="debug") col.load() num_entities = col.num_entities print(f"数据量: {num_entities} 条") if num_entities == 0: print("❌ Collection 为空,没有数据!") return False else: print("✅ Collection 有数据") return True except Exception as e: print(f"❌ 获取数据量失败: {e}") return False def check_collection_index(collection_name: str): """检查 Collection 索引""" print(f"\n5. 检查 Collection 索引") print("-"*60) try: col = Collection(collection_name, using="debug") indexes = col.indexes if not indexes: print("❌ 没有索引!") return False for idx in indexes: print(f" - 字段: {idx.field_name}") print(f" 索引参数: {idx.params}") print("✅ 索引存在") return True except Exception as e: print(f"❌ 获取索引失败: {e}") return False def test_traditional_search(collection_name: str, query_text: str): """测试传统向量搜索(不使用混合搜索)""" print(f"\n6. 测试传统向量搜索") print("-"*60) try: col = Collection(collection_name, using="debug") col.load() # 获取 embedding emdmodel = model_handler.get_embedding_model() query_vector = emdmodel.embed_query(query_text) print(f"查询文本: {query_text}") print(f"向量维度: {len(query_vector)}") # 确定向量字段名 vector_field = None for field in col.schema.fields: if "FLOAT_VECTOR" in str(field.dtype): vector_field = field.name break if not vector_field: print("❌ 未找到向量字段") return False print(f"向量字段: {vector_field}") # 执行搜索 search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}} results = col.search( data=[query_vector], anns_field=vector_field, param=search_params, limit=5, output_fields=["text"] ) print(f"\n搜索结果: {len(results[0])} 条") for i, hit in enumerate(results[0]): print(f" {i+1}. ID={hit.id}, 距离={hit.distance:.4f}") if len(results[0]) > 0: print("✅ 传统向量搜索正常") return True else: print("❌ 传统向量搜索也返回0结果") return False except Exception as e: print(f"❌ 传统搜索失败: {e}") import traceback traceback.print_exc() return False def test_langchain_hybrid_search(collection_name: str, query_text: str): """测试 LangChain Milvus 混合搜索""" print(f"\n7. 测试 LangChain Milvus 混合搜索") print("-"*60) try: from langchain_milvus import Milvus, BM25BuiltInFunction from foundation.infrastructure.config.config import config_handler host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost') port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530')) connection_args = { "uri": f"http://{host}:{port}", "db_name": "lq_db" } emdmodel = model_handler.get_embedding_model() print(f"尝试连接 Collection: {collection_name}") print(f"连接参数: {connection_args}") # 尝试创建 vectorstore vectorstore = Milvus( embedding_function=emdmodel, collection_name=collection_name, connection_args=connection_args, consistency_level="Strong", builtin_function=BM25BuiltInFunction(), vector_field=["dense", "sparse"] ) print("✅ Vectorstore 创建成功") # 执行混合搜索 print(f"\n执行混合搜索,查询: {query_text}") results = vectorstore.similarity_search_with_score( query=query_text, k=5, ranker_type="weighted", ranker_params={"weights": [0.7, 0.3]} ) print(f"搜索结果: {len(results)} 条") for i, (doc, score) in enumerate(results): content = doc.page_content[:50] if doc.page_content else "N/A" print(f" {i+1}. score={score:.4f}, content={content}...") if len(results) > 0: print("✅ 混合搜索正常") return True else: print("❌ 混合搜索返回0结果") return False except Exception as e: print(f"❌ 混合搜索失败: {e}") import traceback traceback.print_exc() return False def test_retrieval_manager(collection_name: str, query_text: str): """测试 RetrievalManager 的混合搜索""" print(f"\n8. 测试 RetrievalManager 混合搜索") print("-"*60) try: from foundation.ai.rag.retrieval.retrieval import retrieval_manager results = retrieval_manager.hybrid_search_recall( collection_name=collection_name, query_text=query_text, top_k=5, ranker_type="weighted", dense_weight=0.7, sparse_weight=0.3 ) print(f"搜索结果: {len(results)} 条") for i, result in enumerate(results): content = result.get('text_content', '')[:50] print(f" {i+1}. {content}...") if len(results) > 0: print("✅ RetrievalManager 混合搜索正常") return True else: print("❌ RetrievalManager 混合搜索返回0结果") return False except Exception as e: print(f"❌ RetrievalManager 测试失败: {e}") import traceback traceback.print_exc() return False def main(): """主诊断函数""" print("\n" + "="*60) print("混合检索问题诊断") print("="*60) # 配置 collection_name = "first_bfp_collection_entity" query_text = "高空作业" print(f"\n诊断目标:") print(f" - Collection: {collection_name}") print(f" - 查询文本: {query_text}") # 执行诊断 results = {} # 1. 检查连接 results['connection'] = check_milvus_connection() if not results['connection']: print("\n❌ Milvus 连接失败,无法继续诊断") return # 2. 检查 Collection 存在 results['exists'] = check_collection_exists(collection_name) if not results['exists']: print(f"\n❌ Collection '{collection_name}' 不存在,无法继续诊断") return # 3. 检查 Schema results['schema'] = check_collection_schema(collection_name) # 4. 检查数据量 results['data'] = check_collection_data(collection_name) # 5. 检查索引 results['index'] = check_collection_index(collection_name) # 6. 测试传统搜索 results['traditional'] = test_traditional_search(collection_name, query_text) # 7. 测试 LangChain 混合搜索 results['langchain'] = test_langchain_hybrid_search(collection_name, query_text) # 8. 测试 RetrievalManager results['retrieval'] = test_retrieval_manager(collection_name, query_text) # 总结 print("\n" + "="*60) print("诊断总结") print("="*60) for key, value in results.items(): status = "✅" if value else "❌" print(f" {status} {key}") # 给出建议 print("\n" + "="*60) print("问题分析与建议") print("="*60) if not results.get('schema'): print(""" ⚠️ 主要问题: Collection Schema 不支持混合搜索 原因: Collection 缺少 'dense' 和 'sparse' 字段 混合搜索需要在创建 Collection 时使用 BM25BuiltInFunction 解决方案: 1. 使用 create_hybrid_collection 方法重新创建 Collection 2. 或者修改代码,对不支持混合搜索的 Collection 使用传统向量搜索 """) if results.get('traditional') and not results.get('langchain'): print(""" ⚠️ 问题: 传统搜索正常,但混合搜索失败 可能原因: 1. Collection 创建时未启用 BM25 功能 2. LangChain Milvus 版本兼容性问题 3. vector_field 配置与实际字段名不匹配 建议: 1. 检查 Collection 创建方式 2. 确认 langchain-milvus 版本 """) if not results.get('data'): print(""" ⚠️ 问题: Collection 为空 解决方案: 先向 Collection 中导入数据 """) if __name__ == "__main__": main()