test_hybrid_search_debug.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. 混合检索问题诊断脚本
  5. 用于排查 hybrid_search 返回0结果的问题
  6. """
  7. import sys
  8. import os
  9. from pymilvus import connections, Collection, utility
  10. from foundation.ai.models.model_handler import model_handler
  11. from foundation.observability.logger.loggering import review_logger as logger
  12. def check_milvus_connection():
  13. """检查 Milvus 连接"""
  14. print("\n" + "="*60)
  15. print("1. 检查 Milvus 连接")
  16. print("="*60)
  17. try:
  18. from foundation.infrastructure.config.config import config_handler
  19. host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
  20. port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
  21. connections.connect(
  22. alias="debug",
  23. host=host,
  24. port=port,
  25. db_name="lq_db"
  26. )
  27. print(f"✅ Milvus 连接成功: {host}:{port}")
  28. return True
  29. except Exception as e:
  30. print(f"❌ Milvus 连接失败: {e}")
  31. return False
  32. def check_collection_exists(collection_name: str):
  33. """检查 Collection 是否存在"""
  34. print(f"\n2. 检查 Collection 是否存在: {collection_name}")
  35. print("-"*60)
  36. exists = utility.has_collection(collection_name, using="debug")
  37. if exists:
  38. print(f"✅ Collection '{collection_name}' 存在")
  39. else:
  40. print(f"❌ Collection '{collection_name}' 不存在!")
  41. return exists
  42. def check_collection_schema(collection_name: str):
  43. """检查 Collection Schema 结构"""
  44. print(f"\n3. 检查 Collection Schema 结构")
  45. print("-"*60)
  46. try:
  47. col = Collection(collection_name, using="debug")
  48. schema = col.schema
  49. print(f"Collection: {collection_name}")
  50. print(f"Description: {schema.description}")
  51. print(f"\n字段列表:")
  52. has_dense = False
  53. has_sparse = False
  54. field_names = []
  55. for field in schema.fields:
  56. field_names.append(field.name)
  57. print(f" - {field.name}: {field.dtype.name}", end="")
  58. if hasattr(field, 'dim') and field.dim:
  59. print(f" (dim={field.dim})", end="")
  60. if field.is_primary:
  61. print(" [PRIMARY]", end="")
  62. print()
  63. # 检查关键字段
  64. if field.name == "dense":
  65. has_dense = True
  66. if field.name == "sparse":
  67. has_sparse = True
  68. print(f"\n混合搜索所需字段检查:")
  69. print(f" - dense 字段: {'✅ 存在' if has_dense else '❌ 不存在'}")
  70. print(f" - sparse 字段: {'✅ 存在' if has_sparse else '❌ 不存在'}")
  71. if not has_dense or not has_sparse:
  72. print(f"\n⚠️ 警告: Collection 缺少混合搜索所需的字段!")
  73. print(f" 混合搜索需要 'dense' 和 'sparse' 两个字段")
  74. print(f" 当前字段: {field_names}")
  75. return has_dense and has_sparse
  76. except Exception as e:
  77. print(f"❌ 获取 Schema 失败: {e}")
  78. return False
  79. def check_collection_data(collection_name: str):
  80. """检查 Collection 数据量"""
  81. print(f"\n4. 检查 Collection 数据量")
  82. print("-"*60)
  83. try:
  84. col = Collection(collection_name, using="debug")
  85. col.load()
  86. num_entities = col.num_entities
  87. print(f"数据量: {num_entities} 条")
  88. if num_entities == 0:
  89. print("❌ Collection 为空,没有数据!")
  90. return False
  91. else:
  92. print("✅ Collection 有数据")
  93. return True
  94. except Exception as e:
  95. print(f"❌ 获取数据量失败: {e}")
  96. return False
  97. def check_collection_index(collection_name: str):
  98. """检查 Collection 索引"""
  99. print(f"\n5. 检查 Collection 索引")
  100. print("-"*60)
  101. try:
  102. col = Collection(collection_name, using="debug")
  103. indexes = col.indexes
  104. if not indexes:
  105. print("❌ 没有索引!")
  106. return False
  107. for idx in indexes:
  108. print(f" - 字段: {idx.field_name}")
  109. print(f" 索引参数: {idx.params}")
  110. print("✅ 索引存在")
  111. return True
  112. except Exception as e:
  113. print(f"❌ 获取索引失败: {e}")
  114. return False
  115. def test_traditional_search(collection_name: str, query_text: str):
  116. """测试传统向量搜索(不使用混合搜索)"""
  117. print(f"\n6. 测试传统向量搜索")
  118. print("-"*60)
  119. try:
  120. col = Collection(collection_name, using="debug")
  121. col.load()
  122. # 获取 embedding
  123. emdmodel = model_handler.get_embedding_model()
  124. query_vector = emdmodel.embed_query(query_text)
  125. print(f"查询文本: {query_text}")
  126. print(f"向量维度: {len(query_vector)}")
  127. # 确定向量字段名
  128. vector_field = None
  129. for field in col.schema.fields:
  130. if "FLOAT_VECTOR" in str(field.dtype):
  131. vector_field = field.name
  132. break
  133. if not vector_field:
  134. print("❌ 未找到向量字段")
  135. return False
  136. print(f"向量字段: {vector_field}")
  137. # 执行搜索
  138. search_params = {"metric_type": "COSINE", "params": {"nprobe": 10}}
  139. results = col.search(
  140. data=[query_vector],
  141. anns_field=vector_field,
  142. param=search_params,
  143. limit=5,
  144. output_fields=["text"]
  145. )
  146. print(f"\n搜索结果: {len(results[0])} 条")
  147. for i, hit in enumerate(results[0]):
  148. print(f" {i+1}. ID={hit.id}, 距离={hit.distance:.4f}")
  149. if len(results[0]) > 0:
  150. print("✅ 传统向量搜索正常")
  151. return True
  152. else:
  153. print("❌ 传统向量搜索也返回0结果")
  154. return False
  155. except Exception as e:
  156. print(f"❌ 传统搜索失败: {e}")
  157. import traceback
  158. traceback.print_exc()
  159. return False
  160. def test_langchain_hybrid_search(collection_name: str, query_text: str):
  161. """测试 LangChain Milvus 混合搜索"""
  162. print(f"\n7. 测试 LangChain Milvus 混合搜索")
  163. print("-"*60)
  164. try:
  165. from langchain_milvus import Milvus, BM25BuiltInFunction
  166. from foundation.infrastructure.config.config import config_handler
  167. host = config_handler.get('milvus', 'MILVUS_HOST', 'localhost')
  168. port = int(config_handler.get('milvus', 'MILVUS_PORT', '19530'))
  169. connection_args = {
  170. "uri": f"http://{host}:{port}",
  171. "db_name": "lq_db"
  172. }
  173. emdmodel = model_handler.get_embedding_model()
  174. print(f"尝试连接 Collection: {collection_name}")
  175. print(f"连接参数: {connection_args}")
  176. # 尝试创建 vectorstore
  177. vectorstore = Milvus(
  178. embedding_function=emdmodel,
  179. collection_name=collection_name,
  180. connection_args=connection_args,
  181. consistency_level="Strong",
  182. builtin_function=BM25BuiltInFunction(),
  183. vector_field=["dense", "sparse"]
  184. )
  185. print("✅ Vectorstore 创建成功")
  186. # 执行混合搜索
  187. print(f"\n执行混合搜索,查询: {query_text}")
  188. results = vectorstore.similarity_search_with_score(
  189. query=query_text,
  190. k=5,
  191. ranker_type="weighted",
  192. ranker_params={"weights": [0.7, 0.3]}
  193. )
  194. print(f"搜索结果: {len(results)} 条")
  195. for i, (doc, score) in enumerate(results):
  196. content = doc.page_content[:50] if doc.page_content else "N/A"
  197. print(f" {i+1}. score={score:.4f}, content={content}...")
  198. if len(results) > 0:
  199. print("✅ 混合搜索正常")
  200. return True
  201. else:
  202. print("❌ 混合搜索返回0结果")
  203. return False
  204. except Exception as e:
  205. print(f"❌ 混合搜索失败: {e}")
  206. import traceback
  207. traceback.print_exc()
  208. return False
  209. def test_retrieval_manager(collection_name: str, query_text: str):
  210. """测试 RetrievalManager 的混合搜索"""
  211. print(f"\n8. 测试 RetrievalManager 混合搜索")
  212. print("-"*60)
  213. try:
  214. from foundation.ai.rag.retrieval.retrieval import retrieval_manager
  215. results = retrieval_manager.hybrid_search_recall(
  216. collection_name=collection_name,
  217. query_text=query_text,
  218. top_k=5,
  219. ranker_type="weighted",
  220. dense_weight=0.7,
  221. sparse_weight=0.3
  222. )
  223. print(f"搜索结果: {len(results)} 条")
  224. for i, result in enumerate(results):
  225. content = result.get('text_content', '')[:50]
  226. print(f" {i+1}. {content}...")
  227. if len(results) > 0:
  228. print("✅ RetrievalManager 混合搜索正常")
  229. return True
  230. else:
  231. print("❌ RetrievalManager 混合搜索返回0结果")
  232. return False
  233. except Exception as e:
  234. print(f"❌ RetrievalManager 测试失败: {e}")
  235. import traceback
  236. traceback.print_exc()
  237. return False
  238. def main():
  239. """主诊断函数"""
  240. print("\n" + "="*60)
  241. print("混合检索问题诊断")
  242. print("="*60)
  243. # 配置
  244. collection_name = "first_bfp_collection_entity"
  245. query_text = "高空作业"
  246. print(f"\n诊断目标:")
  247. print(f" - Collection: {collection_name}")
  248. print(f" - 查询文本: {query_text}")
  249. # 执行诊断
  250. results = {}
  251. # 1. 检查连接
  252. results['connection'] = check_milvus_connection()
  253. if not results['connection']:
  254. print("\n❌ Milvus 连接失败,无法继续诊断")
  255. return
  256. # 2. 检查 Collection 存在
  257. results['exists'] = check_collection_exists(collection_name)
  258. if not results['exists']:
  259. print(f"\n❌ Collection '{collection_name}' 不存在,无法继续诊断")
  260. return
  261. # 3. 检查 Schema
  262. results['schema'] = check_collection_schema(collection_name)
  263. # 4. 检查数据量
  264. results['data'] = check_collection_data(collection_name)
  265. # 5. 检查索引
  266. results['index'] = check_collection_index(collection_name)
  267. # 6. 测试传统搜索
  268. results['traditional'] = test_traditional_search(collection_name, query_text)
  269. # 7. 测试 LangChain 混合搜索
  270. results['langchain'] = test_langchain_hybrid_search(collection_name, query_text)
  271. # 8. 测试 RetrievalManager
  272. results['retrieval'] = test_retrieval_manager(collection_name, query_text)
  273. # 总结
  274. print("\n" + "="*60)
  275. print("诊断总结")
  276. print("="*60)
  277. for key, value in results.items():
  278. status = "✅" if value else "❌"
  279. print(f" {status} {key}")
  280. # 给出建议
  281. print("\n" + "="*60)
  282. print("问题分析与建议")
  283. print("="*60)
  284. if not results.get('schema'):
  285. print("""
  286. ⚠️ 主要问题: Collection Schema 不支持混合搜索
  287. 原因: Collection 缺少 'dense' 和 'sparse' 字段
  288. 混合搜索需要在创建 Collection 时使用 BM25BuiltInFunction
  289. 解决方案:
  290. 1. 使用 create_hybrid_collection 方法重新创建 Collection
  291. 2. 或者修改代码,对不支持混合搜索的 Collection 使用传统向量搜索
  292. """)
  293. if results.get('traditional') and not results.get('langchain'):
  294. print("""
  295. ⚠️ 问题: 传统搜索正常,但混合搜索失败
  296. 可能原因:
  297. 1. Collection 创建时未启用 BM25 功能
  298. 2. LangChain Milvus 版本兼容性问题
  299. 3. vector_field 配置与实际字段名不匹配
  300. 建议:
  301. 1. 检查 Collection 创建方式
  302. 2. 确认 langchain-milvus 版本
  303. """)
  304. if not results.get('data'):
  305. print("""
  306. ⚠️ 问题: Collection 为空
  307. 解决方案: 先向 Collection 中导入数据
  308. """)
  309. if __name__ == "__main__":
  310. main()