| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- """
- 混合检索问题诊断脚本
- 用于排查 hybrid_search 返回0结果的问题
- """
- import sys
- import os
- sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
- from pymilvus import connections, Collection, utility
- from foundation.ai.models.model_handler import model_handler
- from foundation.observability.logger.loggering import review_logger as logger
- def check_milvus_connection():
- """检查 Milvus 连接"""
- print("\n" + "="*60)
- print("1. 检查 Milvus 连接")
- print("="*60)
- try:
- from foundation.infrastructure.config.config import config_handler
- host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
- port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
-
- connections.connect(
- alias="debug",
- host=host,
- port=port,
- db_name="lq_db"
- )
- print(f"✅ Milvus 连接成功: {host}:{port}")
- return True
- except Exception as e:
- print(f"❌ Milvus 连接失败: {e}")
- return False
- def check_collection_exists(collection_name: str):
- """检查 Collection 是否存在"""
- print(f"\n2. 检查 Collection 是否存在: {collection_name}")
- print("-"*60)
-
- exists = utility.has_collection(collection_name, using="debug")
- if exists:
- print(f"✅ Collection '{collection_name}' 存在")
- else:
- print(f"❌ Collection '{collection_name}' 不存在!")
- return exists
- def check_collection_schema(collection_name: str):
- """检查 Collection Schema 结构"""
- print(f"\n3. 检查 Collection Schema 结构")
- print("-"*60)
-
- try:
- col = Collection(collection_name, using="debug")
- schema = col.schema
-
- print(f"Collection: {collection_name}")
- print(f"Description: {schema.description}")
- print(f"\n字段列表:")
-
- has_dense = False
- has_sparse = False
- field_names = []
-
- for field in schema.fields:
- field_names.append(field.name)
- print(f" - {field.name}: {field.dtype.name}", end="")
- if hasattr(field, 'dim') and field.dim:
- print(f" (dim={field.dim})", end="")
- if field.is_primary:
- print(" [PRIMARY]", end="")
- print()
-
- # 检查关键字段
- if field.name == "dense":
- has_dense = True
- if field.name == "sparse":
- has_sparse = True
-
- print(f"\n混合搜索所需字段检查:")
- print(f" - dense 字段: {'✅ 存在' if has_dense else '❌ 不存在'}")
- print(f" - sparse 字段: {'✅ 存在' if has_sparse else '❌ 不存在'}")
-
- if not has_dense or not has_sparse:
- print(f"\n⚠️ 警告: Collection 缺少混合搜索所需的字段!")
- print(f" 混合搜索需要 'dense' 和 'sparse' 两个字段")
- print(f" 当前字段: {field_names}")
-
- return has_dense and has_sparse
-
- except Exception as e:
- print(f"❌ 获取 Schema 失败: {e}")
- return False
- def check_collection_data(collection_name: str):
- """检查 Collection 数据量"""
- print(f"\n4. 检查 Collection 数据量")
- print("-"*60)
-
- try:
- col = Collection(collection_name, using="debug")
- col.load()
- num_entities = col.num_entities
-
- print(f"数据量: {num_entities} 条")
-
- if num_entities == 0:
- print("❌ Collection 为空,没有数据!")
- return False
- else:
- print("✅ Collection 有数据")
- return True
-
- except Exception as e:
- print(f"❌ 获取数据量失败: {e}")
- return False
- def check_collection_index(collection_name: str):
- """检查 Collection 索引"""
- print(f"\n5. 检查 Collection 索引")
- print("-"*60)
-
- try:
- col = Collection(collection_name, using="debug")
- indexes = col.indexes
-
- if not indexes:
- print("❌ 没有索引!")
- return False
-
- for idx in indexes:
- print(f" - 字段: {idx.field_name}")
- print(f" 索引参数: {idx.params}")
-
- print("✅ 索引存在")
- return True
-
- except Exception as e:
- print(f"❌ 获取索引失败: {e}")
- return False
- def test_traditional_search(collection_name: str, query_text: str):
- """测试传统向量搜索(不使用混合搜索)"""
- print(f"\n6. 测试传统向量搜索")
- print("-"*60)
-
- try:
- col = Collection(collection_name, using="debug")
- col.load()
-
- # 获取 embedding
- emdmodel = model_handler.get_embedding_model()
- query_vector = emdmodel.embed_query(query_text)
-
- print(f"查询文本: {query_text}")
- print(f"向量维度: {len(query_vector)}")
-
- # 确定向量字段名
- vector_field = None
- for field in col.schema.fields:
- if "FLOAT_VECTOR" in str(field.dtype):
- vector_field = field.name
- break
-
- if not vector_field:
- print("❌ 未找到向量字段")
- return False
-
- print(f"向量字段: {vector_field}")
-
- # 执行搜索
- search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
- results = col.search(
- data=[query_vector],
- anns_field=vector_field,
- param=search_params,
- limit=5,
- output_fields=["text"]
- )
-
- print(f"\n搜索结果: {len(results[0])} 条")
- for i, hit in enumerate(results[0]):
- print(f" {i+1}. ID={hit.id}, 距离={hit.distance:.4f}")
-
- if len(results[0]) > 0:
- print("✅ 传统向量搜索正常")
- return True
- else:
- print("❌ 传统向量搜索也返回0结果")
- return False
-
- except Exception as e:
- print(f"❌ 传统搜索失败: {e}")
- import traceback
- traceback.print_exc()
- return False
- def test_langchain_hybrid_search(collection_name: str, query_text: str):
- """测试 LangChain Milvus 混合搜索"""
- print(f"\n7. 测试 LangChain Milvus 混合搜索")
- print("-"*60)
-
- try:
- from langchain_milvus import Milvus, BM25BuiltInFunction
- from foundation.infrastructure.config.config import config_handler
-
- host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
- port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
-
- connection_args = {
- "uri": f"http://{host}:{port}",
- "db_name": "lq_db"
- }
-
- emdmodel = model_handler.get_embedding_model()
-
- print(f"尝试连接 Collection: {collection_name}")
- print(f"连接参数: {connection_args}")
-
- # 尝试创建 vectorstore
- vectorstore = Milvus(
- embedding_function=emdmodel,
- collection_name=collection_name,
- connection_args=connection_args,
- consistency_level="Strong",
- builtin_function=BM25BuiltInFunction(),
- vector_field=["dense", "sparse"]
- )
-
- print("✅ Vectorstore 创建成功")
-
- # 执行混合搜索
- print(f"\n执行混合搜索,查询: {query_text}")
- results = vectorstore.similarity_search_with_score(
- query=query_text,
- k=5,
- ranker_type="weighted",
- ranker_params={"weights": [0.7, 0.3]}
- )
-
- print(f"搜索结果: {len(results)} 条")
- for i, (doc, score) in enumerate(results):
- content = doc.page_content[:50] if doc.page_content else "N/A"
- print(f" {i+1}. score={score:.4f}, content={content}...")
-
- if len(results) > 0:
- print("✅ 混合搜索正常")
- return True
- else:
- print("❌ 混合搜索返回0结果")
- return False
-
- except Exception as e:
- print(f"❌ 混合搜索失败: {e}")
- import traceback
- traceback.print_exc()
- return False
- def test_retrieval_manager(collection_name: str, query_text: str):
- """测试 RetrievalManager 的混合搜索"""
- print(f"\n8. 测试 RetrievalManager 混合搜索")
- print("-"*60)
-
- try:
- from foundation.ai.rag.retrieval.retrieval import retrieval_manager
-
- results = retrieval_manager.hybrid_search_recall(
- collection_name=collection_name,
- query_text=query_text,
- top_k=5,
- ranker_type="weighted",
- dense_weight=0.7,
- sparse_weight=0.3
- )
-
- print(f"搜索结果: {len(results)} 条")
- for i, result in enumerate(results):
- content = result.get('text_content', '')[:50]
- print(f" {i+1}. {content}...")
-
- if len(results) > 0:
- print("✅ RetrievalManager 混合搜索正常")
- return True
- else:
- print("❌ RetrievalManager 混合搜索返回0结果")
- return False
-
- except Exception as e:
- print(f"❌ RetrievalManager 测试失败: {e}")
- import traceback
- traceback.print_exc()
- return False
- def main():
- """主诊断函数"""
- print("\n" + "="*60)
- print("混合检索问题诊断")
- print("="*60)
-
- # 配置
- collection_name = "first_bfp_collection_entity"
- query_text = "高空作业"
-
- print(f"\n诊断目标:")
- print(f" - Collection: {collection_name}")
- print(f" - 查询文本: {query_text}")
-
- # 执行诊断
- results = {}
-
- # 1. 检查连接
- results['connection'] = check_milvus_connection()
- if not results['connection']:
- print("\n❌ Milvus 连接失败,无法继续诊断")
- return
-
- # 2. 检查 Collection 存在
- results['exists'] = check_collection_exists(collection_name)
- if not results['exists']:
- print(f"\n❌ Collection '{collection_name}' 不存在,无法继续诊断")
- return
-
- # 3. 检查 Schema
- results['schema'] = check_collection_schema(collection_name)
-
- # 4. 检查数据量
- results['data'] = check_collection_data(collection_name)
-
- # 5. 检查索引
- results['index'] = check_collection_index(collection_name)
-
- # 6. 测试传统搜索
- results['traditional'] = test_traditional_search(collection_name, query_text)
-
- # 7. 测试 LangChain 混合搜索
- results['langchain'] = test_langchain_hybrid_search(collection_name, query_text)
-
- # 8. 测试 RetrievalManager
- results['retrieval'] = test_retrieval_manager(collection_name, query_text)
-
- # 总结
- print("\n" + "="*60)
- print("诊断总结")
- print("="*60)
-
- for key, value in results.items():
- status = "✅" if value else "❌"
- print(f" {status} {key}")
-
- # 给出建议
- print("\n" + "="*60)
- print("问题分析与建议")
- print("="*60)
-
- if not results.get('schema'):
- print("""
- ⚠️ 主要问题: Collection Schema 不支持混合搜索
- 原因: Collection 缺少 'dense' 和 'sparse' 字段
- 混合搜索需要在创建 Collection 时使用 BM25BuiltInFunction
- 解决方案:
- 1. 使用 create_hybrid_collection 方法重新创建 Collection
- 2. 或者修改代码,对不支持混合搜索的 Collection 使用传统向量搜索
- """)
-
- if results.get('traditional') and not results.get('langchain'):
- print("""
- ⚠️ 问题: 传统搜索正常,但混合搜索失败
- 可能原因:
- 1. Collection 创建时未启用 BM25 功能
- 2. LangChain Milvus 版本兼容性问题
- 3. vector_field 配置与实际字段名不匹配
- 建议:
- 1. 检查 Collection 创建方式
- 2. 确认 langchain-milvus 版本
- """)
-
- if not results.get('data'):
- print("""
- ⚠️ 问题: Collection 为空
- 解决方案: 先向 Collection 中导入数据
- """)
- if __name__ == "__main__":
- main()
|