test_rag_pipeline.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. RAG链路独立测试工具
  5. 用于快速调通和验证参数合规性检查的RAG检索+LLM审查功能
  6. 核心功能:
  7. 1. rag_enhanced_check() - 完整的RAG检索逻辑
  8. 2. check_parameter_compliance() - 参数合规性检查(与原链路完全一致)
  9. 使用方法:
  10. python test_rag_pipeline.py
  11. """
  12. import sys
  13. import os
  14. import json
  15. import time
  16. import asyncio
  17. from typing import Dict, Any
  18. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  19. from core.construction_review.component.infrastructure.milvus import MilvusConfig, MilvusManager
  20. from core.construction_review.component.infrastructure.parent_tool import (
  21. enhance_with_parent_docs,
  22. extract_first_result
  23. )
  24. from core.construction_review.component.reviewers.base_reviewer import BaseReviewer, ReviewResult
  25. from foundation.ai.rag.retrieval.entities_enhance import entity_enhance
  26. from foundation.ai.rag.retrieval.query_rewrite import query_rewrite_manager
  27. from foundation.ai.agent.generate.model_generate import generate_model_client
  28. from core.construction_review.component.reviewers.utils.prompt_loader import prompt_loader
  29. from foundation.observability.logger.loggering import review_logger as logger
  30. # ============================================================================
  31. # 简化的BaseReviewer类 - 用于调用LLM审查
  32. # ============================================================================
  33. class SimpleReviewer(BaseReviewer):
  34. """
  35. 简化的审查器 - 继承BaseReviewer,用于调用LLM审查
  36. """
  37. def __init__(self):
  38. """初始化简化的审查器"""
  39. super().__init__()
  40. self.model_client = generate_model_client
  41. self.prompt_loader = prompt_loader
  42. # 全局审查器实例
  43. simple_reviewer = SimpleReviewer()
  44. # ============================================================================
  45. # 核心RAG链路函数
  46. # ============================================================================
  47. def rag_enhanced_check(milvus_manager, unit_content: dict) -> dict:
  48. """
  49. RAG增强检查 - 完整链路
  50. 流程:
  51. 1. 查询提取 (query_rewrite_manager.query_extract)
  52. 2. 实体增强检索 (entity_enhance.entities_enhance_retrieval)
  53. 3. 父文档增强 (enhance_with_parent_docs)
  54. 4. 提取第一个结果 (extract_first_result)
  55. Args:
  56. milvus_manager: MilvusManager实例
  57. unit_content: 包含content字段的字典,格式: {"content": "待检索的文本内容"}
  58. Returns:
  59. dict: RAG检索结果,包含:
  60. - vector_search: 向量检索结果列表
  61. - retrieval_status: 检索状态
  62. - file_name: 参考文件名
  63. - text_content: 参考文本内容
  64. - metadata: 元数据信息
  65. """
  66. # 创建数据流跟踪字典
  67. pipeline_data = {
  68. "stage": "rag_enhanced_check",
  69. "timestamp": time.time(),
  70. "steps": {}
  71. }
  72. query_content = unit_content['content']
  73. logger.info(f"[RAG增强] 开始处理, 内容长度: {len(query_content)}")
  74. # Step 1: 查询提取
  75. logger.info("=" * 80)
  76. logger.info("Step 1: 查询提取")
  77. logger.info("=" * 80)
  78. logger.info(f"开始查询提取, 输入内容长度: {len(query_content)}")
  79. logger.info(f"输入内容预览: {query_content[:200]}...")
  80. # 执行查询提取
  81. query_pairs = query_rewrite_manager.query_extract(query_content)
  82. logger.info(f"[RAG增强] 提取到 {len(query_pairs)} 个查询对")
  83. # 打印查询对详情
  84. for idx, query_pair in enumerate(query_pairs):
  85. logger.info(f" 查询对 {idx+1}: {query_pair}")
  86. # 保存Step 1的输入输出
  87. pipeline_data["steps"]["1_query_extract"] = {
  88. "input": {
  89. "content_length": len(query_content),
  90. "content_full": query_content,
  91. "content_preview": query_content[:200]
  92. },
  93. "output": {
  94. "query_pairs_count": len(query_pairs),
  95. "query_pairs": [str(qp) for qp in query_pairs], # 转为字符串列表便于查看
  96. "extraction_timestamp": time.time()
  97. }
  98. }
  99. # Step 2: 实体增强检索
  100. logger.info("=" * 80)
  101. logger.info("Step 2: 实体增强检索")
  102. logger.info("=" * 80)
  103. logger.info(f"开始实体增强检索, 输入查询对数量: {len(query_pairs)}")
  104. # 保存输入
  105. entity_enhance_input = {
  106. "query_pairs": [str(qp) for qp in query_pairs],
  107. "query_pairs_count": len(query_pairs)
  108. }
  109. # 详细记录每个查询对的处理过程
  110. entity_enhance_process_details = []
  111. # 手动展开实体增强检索的每个步骤,便于记录数据流
  112. import asyncio
  113. def run_async(coro):
  114. """在合适的环境中运行异步函数"""
  115. try:
  116. loop = asyncio.get_running_loop()
  117. import concurrent.futures
  118. with concurrent.futures.ThreadPoolExecutor() as executor:
  119. future = executor.submit(asyncio.run, coro)
  120. return future.result()
  121. except RuntimeError:
  122. return asyncio.run(coro)
  123. # 导入retrieval_manager
  124. from foundation.ai.rag.retrieval.retrieval import retrieval_manager
  125. bfp_result_lists = []
  126. # 遍历每个查询对进行处理
  127. for idx, query_pair in enumerate(query_pairs):
  128. logger.info(f"\n{'='*60}")
  129. logger.info(f"处理查询对 {idx+1}/{len(query_pairs)}")
  130. logger.info(f"{'='*60}")
  131. # 提取查询对的各个字段
  132. entity = query_pair['entity']
  133. search_keywords = query_pair['search_keywords']
  134. background = query_pair['background']
  135. logger.info(f" 实体(entity): {entity}")
  136. logger.info(f" 搜索关键词(search_keywords): {search_keywords}")
  137. logger.info(f" 背景(background): {background}")
  138. # 记录当前查询对的输入
  139. current_query_detail = {
  140. "index": idx + 1,
  141. "input": {
  142. "entity": entity,
  143. "search_keywords": search_keywords,
  144. "background": background
  145. },
  146. "steps": {}
  147. }
  148. # Step 2.1: 实体召回 (entity_recall)
  149. logger.info(f" Step 2.1: 实体召回 (recall_top_k=5, max_results=5)")
  150. entity_list = run_async(retrieval_manager.entity_recall(
  151. entity,
  152. search_keywords,
  153. recall_top_k=5,
  154. max_results=5
  155. ))
  156. logger.info(f" ✅ 实体召回完成, 召回实体数量: {len(entity_list) if entity_list else 0}")
  157. # 记录实体召回结果
  158. current_query_detail["steps"]["2_1_entity_recall"] = {
  159. "input": {
  160. "entity": entity,
  161. "search_keywords": search_keywords,
  162. "recall_top_k": 5,
  163. "max_results": 5
  164. },
  165. "output": {
  166. "entity_list": entity_list,
  167. "entity_count": len(entity_list) if entity_list else 0
  168. }
  169. }
  170. # Step 2.2: BFP召回 (async_bfp_recall)
  171. logger.info(f" Step 2.2: BFP召回 (top_k=3)")
  172. bfp_result = run_async(retrieval_manager.async_bfp_recall(
  173. entity_list,
  174. background,
  175. top_k=3
  176. ))
  177. logger.info(f" ✅ BFP召回完成, BFP结果数量: {len(bfp_result) if bfp_result else 0}")
  178. logger.info(f" bfp_result: {bfp_result}")
  179. # 记录BFP召回结果
  180. current_query_detail["steps"]["2_2_bfp_recall"] = {
  181. "input": {
  182. "entity_list": entity_list,
  183. "background": background,
  184. "top_k": 3
  185. },
  186. "output": {
  187. "bfp_result": bfp_result,
  188. "bfp_result_count": len(bfp_result) if bfp_result else 0
  189. }
  190. }
  191. bfp_result_lists.append(bfp_result)
  192. entity_enhance_process_details.append(current_query_detail)
  193. logger.info(f"✅ 查询对 {idx+1} 处理完成")
  194. logger.info(f"\n{'='*80}")
  195. logger.info(f"实体增强检索全部完成")
  196. logger.info(f"总查询对数: {len(query_pairs)}")
  197. logger.info(f"总BFP结果数: {len(bfp_result_lists)}")
  198. logger.info(f"{'='*80}")
  199. # 保存Step 2的详细输出
  200. pipeline_data["steps"]["2_entity_enhance_retrieval"] = {
  201. "input": entity_enhance_input,
  202. "output": {
  203. "results_count": len(bfp_result_lists),
  204. "results": bfp_result_lists,
  205. "process_details": entity_enhance_process_details # 每个查询对的详细处理过程
  206. },
  207. "timestamp": time.time()
  208. }
  209. # Step 3: 检查检索结果
  210. if not bfp_result_lists:
  211. logger.warning("[RAG增强] 实体检索未返回结果")
  212. # 保存最终数据流
  213. os.makedirs(r"temp\entity_bfp_recall", exist_ok=True)
  214. with open(rf"temp\entity_bfp_recall\rag_pipeline_data.json", "w", encoding='utf-8') as f:
  215. json.dump(pipeline_data, f, ensure_ascii=False, indent=4)
  216. return {
  217. 'vector_search': [],
  218. 'retrieval_status': 'no_results',
  219. 'file_name': '',
  220. 'text_content': '',
  221. 'metadata': {}
  222. }
  223. logger.info(f"[RAG增强] 实体检索返回 {len(bfp_result_lists)} 个结果")
  224. # Step 4: 父文档增强 (使用独立工具函数)
  225. logger.info("=" * 80)
  226. logger.info("Step 3: 父文档增强")
  227. logger.info("=" * 80)
  228. try:
  229. enhancement_result = enhance_with_parent_docs(milvus_manager, bfp_result_lists)
  230. enhanced_results = enhancement_result['enhanced_results']
  231. enhanced_count = enhancement_result['enhanced_count']
  232. parent_docs = enhancement_result['parent_docs']
  233. # 保存Step 3输出
  234. pipeline_data["steps"]["3_parent_doc_enhancement"] = {
  235. "input": {
  236. "bfp_results_count": len(bfp_result_lists)
  237. },
  238. "output": {
  239. "enhanced_count": enhanced_count,
  240. "parent_docs_count": len(parent_docs),
  241. "parent_docs": parent_docs,
  242. "enhanced_results": enhanced_results
  243. }
  244. }
  245. # 保存增强后的结果
  246. os.makedirs(r"temp\entity_bfp_recall", exist_ok=True)
  247. with open(rf"temp\entity_bfp_recall\enhance_with_parent_docs.json", "w", encoding='utf-8') as f:
  248. json.dump(enhanced_results, f, ensure_ascii=False, indent=4)
  249. logger.info(f"[RAG增强] 成功增强 {enhanced_count} 个结果")
  250. logger.info(f"[RAG增强] 使用了 {len(parent_docs)} 个父文档")
  251. # 打印父文档信息
  252. for idx, parent_doc in enumerate(parent_docs):
  253. logger.info(f" 父文档 {idx+1}: {parent_doc.get('file_name', 'unknown')}")
  254. except Exception as e:
  255. logger.error(f"[RAG增强] 父文档增强失败: {e}", exc_info=True)
  256. # 保存错误信息
  257. pipeline_data["steps"]["3_parent_doc_enhancement"] = {
  258. "input": {
  259. "bfp_results_count": len(bfp_result_lists)
  260. },
  261. "output": {
  262. "error": str(e),
  263. "error_type": type(e).__name__
  264. }
  265. }
  266. # 失败时使用原始结果
  267. enhanced_results = bfp_result_lists
  268. parent_docs = []
  269. # Step 5: 提取第一个结果返回 (使用增强后的结果)
  270. logger.info("=" * 80)
  271. logger.info("Step 4: 提取第一个结果")
  272. logger.info("=" * 80)
  273. final_result = extract_first_result(enhanced_results)
  274. # 保存Step 4输出
  275. pipeline_data["steps"]["4_extract_first_result"] = {
  276. "input": {
  277. "enhanced_results_count": len(enhanced_results)
  278. },
  279. "output": {
  280. "final_result": final_result
  281. }
  282. }
  283. # 保存最终结果用于调试
  284. with open(rf"temp\entity_bfp_recall\extract_first_result.json", "w", encoding='utf-8') as f:
  285. json.dump(final_result, f, ensure_ascii=False, indent=4)
  286. # 保存完整数据流
  287. pipeline_data["final_result"] = final_result
  288. os.makedirs(r"temp\entity_bfp_recall", exist_ok=True)
  289. with open(rf"temp\entity_bfp_recall\rag_pipeline_data.json", "w", encoding='utf-8') as f:
  290. json.dump(pipeline_data, f, ensure_ascii=False, indent=4)
  291. logger.info(f"[RAG增强] 最终提取结果文件名: {final_result.get('file_name', '无')}")
  292. logger.info(f"[RAG增强] 最终提取结果内容长度: {len(final_result.get('text_content', ''))}")
  293. logger.info(f"[RAG增强] 完整数据流已保存到: temp/entity_bfp_recall/rag_pipeline_data.json")
  294. return final_result
  295. # ============================================================================
  296. # 参数合规性检查函数 (与原链路完全一致)
  297. # ============================================================================
  298. async def check_parameter_compliance(trace_id_idx: str, review_content: str, review_references: str,
  299. reference_source: str, review_location_label: str, state: str, stage_name: str) -> Dict[str, Any]:
  300. """
  301. 参数合规性检查 - 实体概念/工程术语知识库
  302. (与原链路完全一致的方法签名和实现)
  303. Args:
  304. trace_id_idx: 追踪ID索引
  305. review_content: 审查内容
  306. review_references: 审查参考信息
  307. reference_source: 参考来源
  308. review_location_label: 审查位置标签
  309. state: 状态字典
  310. stage_name: 阶段名称
  311. Returns:
  312. Dict[str, Any]: 参数合规性检查结果
  313. """
  314. # 从原链路导入Stage枚举
  315. from core.construction_review.component.ai_review_engine import Stage
  316. reviewer_type = Stage.TECHNICAL.value['reviewer_type']
  317. prompt_name = Stage.TECHNICAL.value['parameter']
  318. trace_id = prompt_name + trace_id_idx
  319. # 直接调用原链路的review方法
  320. return await simple_reviewer.review("parameter_compliance_check", trace_id, reviewer_type, prompt_name, review_content, review_references,
  321. reference_source, review_location_label, state, stage_name, timeout=45)
  322. # ============================================================================
  323. # 主测试函数
  324. # ============================================================================
  325. async def main():
  326. """
  327. 主测试函数 - 测试参数合规性检查的完整流程
  328. 流程:
  329. 1. 初始化Milvus Manager
  330. 2. 准备测试内容
  331. 3. 调用RAG获取参考信息
  332. 4. 调用参数合规性检查(与原链路一致)
  333. 5. 保存完整数据流
  334. """
  335. print("\n" + "=" * 80)
  336. print("RAG链路独立测试工具 - 参数合规性检查".center(80))
  337. print("=" * 80 + "\n")
  338. # 初始化Milvus Manager
  339. print("📌 初始化Milvus Manager...")
  340. logger.info("初始化Milvus Manager...")
  341. try:
  342. milvus_manager = MilvusManager(MilvusConfig())
  343. print("✅ Milvus Manager 初始化成功\n")
  344. except Exception as e:
  345. print(f"❌ Milvus Manager 初始化失败: {e}")
  346. logger.error(f"Milvus Manager 初始化失败: {e}", exc_info=True)
  347. return
  348. # 测试内容
  349. test_content = """主要部件说明
  350. 1、主梁总成
  351. 主梁总成由主梁和导梁构成。主梁单节长12m,共7节,每节重10.87t,主梁为主要承载受力构件,其上弦杆上方设有轨道供纵移桁车走行,实现预制梁的纵向移动;下弦设有反滚轮行走轨道,作为导梁纵移、前中支腿移动纵行轨道。导梁长18m,主要是为降低过孔挠度和承受中支腿移动荷载,起安全引导、辅助过孔作用。主梁、导梁为三角桁架构件单元,采用销轴连接,前、后端各设置横联构架。
  352. 图4-1 主梁总成图
  353. 注意事项:
  354. (1)更换上、下弦销轴时,应优先向设备供应方购买符合要求的备件。自行更换时,材料性能必须优于设计零件性能,并按规定进行热处理,否则可能造成人员、设备事故。
  355. (2)销轴不得弯曲受力,不得用销轴作为锤砸工具,不得任意放置及焊接"""
  356. unit_content = {"content": test_content}
  357. print(f"📝 测试内容长度: {len(test_content)} 字符")
  358. print(f"📝 测试内容预览:\n{test_content[:200]}...\n")
  359. # 创建数据流跟踪字典
  360. pipeline_data = {
  361. "stage": "parameter_compliance_check_full_pipeline",
  362. "timestamp": time.time(),
  363. "steps": {}
  364. }
  365. # Step 1: RAG增强检索
  366. print("=" * 80)
  367. print("【Step 1】RAG增强检索".center(80))
  368. print("=" * 80)
  369. logger.info("=" * 80)
  370. logger.info("Step 1: RAG增强检索")
  371. logger.info("=" * 80)
  372. start_time = time.time()
  373. rag_result = rag_enhanced_check(milvus_manager, unit_content)
  374. review_references = rag_result.get('text_content', '')
  375. reference_source = rag_result.get('file_name', '')
  376. # 保存Step 1数据
  377. pipeline_data["steps"]["1_rag_retrieval"] = {
  378. "input": {
  379. "unit_content": unit_content
  380. },
  381. "output": {
  382. "rag_result": rag_result,
  383. "review_references_length": len(review_references),
  384. "reference_source": reference_source
  385. },
  386. "execution_time": time.time() - start_time
  387. }
  388. if not review_references:
  389. logger.warning("RAG检索未返回参考信息,将继续使用空参考进行审查")
  390. print("⚠️ RAG检索未返回参考信息\n")
  391. else:
  392. print(f"✅ RAG检索成功")
  393. print(f" 参考来源: {reference_source}")
  394. print(f" 参考内容长度: {len(review_references)} 字符\n")
  395. # Step 2: 调用参数合规性检查 (使用原链路的方法)
  396. print("=" * 80)
  397. print("【Step 2】参数合规性检查 (LLM审查)".center(80))
  398. print("=" * 80)
  399. logger.info("=" * 80)
  400. logger.info("Step 2: 参数合规性检查")
  401. logger.info("=" * 80)
  402. trace_id_idx = "_test_001"
  403. review_location_label = "测试文档-第1章"
  404. state = None
  405. stage_name = "test_stage"
  406. logger.info(f"开始调用参数合规性检查")
  407. logger.info(f" - trace_id_idx: {trace_id_idx}")
  408. logger.info(f" - review_content长度: {len(test_content)}")
  409. logger.info(f" - review_references长度: {len(review_references)}")
  410. logger.info(f" - reference_source: {reference_source}")
  411. # 保存Step 2输入
  412. pipeline_data["steps"]["2_parameter_compliance_check"] = {
  413. "input": {
  414. "trace_id_idx": trace_id_idx,
  415. "review_content_length": len(test_content),
  416. "review_content_preview": test_content[:200],
  417. "review_references_length": len(review_references),
  418. "review_references_preview": review_references[:200] if review_references else "",
  419. "reference_source": reference_source,
  420. "review_location_label": review_location_label,
  421. "stage_name": stage_name
  422. },
  423. "output": {}
  424. }
  425. start_time = time.time()
  426. try:
  427. # 调用与原链路完全一致的方法
  428. result = await check_parameter_compliance(
  429. trace_id_idx=trace_id_idx,
  430. review_content=test_content,
  431. review_references=review_references,
  432. reference_source=reference_source,
  433. review_location_label=review_location_label,
  434. state=state,
  435. stage_name=stage_name
  436. )
  437. elapsed_time = time.time() - start_time
  438. # 保存Step 2输出
  439. pipeline_data["steps"]["2_parameter_compliance_check"]["output"] = {
  440. "success": result.success,
  441. "execution_time": result.execution_time,
  442. "error_message": result.error_message,
  443. "details": result.details
  444. }
  445. # 保存完整数据流
  446. pipeline_data["final_result"] = {
  447. "success": result.success,
  448. "execution_time": result.execution_time,
  449. "error_message": result.error_message,
  450. "details": result.details
  451. }
  452. os.makedirs(r"temp\entity_bfp_recall", exist_ok=True)
  453. with open(rf"temp\entity_bfp_recall\parameter_compliance_full_pipeline.json", "w", encoding='utf-8') as f:
  454. json.dump(pipeline_data, f, ensure_ascii=False, indent=4)
  455. logger.info(f"✅ 参数合规性检查完成, 总耗时: {elapsed_time:.2f}秒")
  456. logger.info(f"📁 完整数据流已保存到: temp/entity_bfp_recall/parameter_compliance_full_pipeline.json")
  457. except Exception as e:
  458. error_msg = f"参数合规性检查失败: {str(e)}"
  459. logger.error(error_msg, exc_info=True)
  460. # 保存错误信息
  461. pipeline_data["steps"]["2_parameter_compliance_check"]["output"] = {
  462. "error": error_msg,
  463. "error_type": type(e).__name__,
  464. "traceback": str(e)
  465. }
  466. pipeline_data["error"] = {
  467. "error_message": error_msg,
  468. "error_type": type(e).__name__
  469. }
  470. os.makedirs(r"temp\entity_bfp_recall", exist_ok=True)
  471. with open(rf"temp\entity_bfp_recall\parameter_compliance_full_pipeline.json", "w", encoding='utf-8') as f:
  472. json.dump(pipeline_data, f, ensure_ascii=False, indent=4)
  473. print(f"❌ 参数合规性检查失败: {error_msg}\n")
  474. return
  475. # 输出测试结果
  476. print("\n" + "=" * 80)
  477. print("测试结果".center(80))
  478. print("=" * 80)
  479. status_icon = "✅" if result.success else "❌"
  480. print(f"\n{status_icon} 参数合规性检查")
  481. print(f" 执行时间: {result.execution_time:.2f}秒")
  482. if result.success:
  483. print(f" 审查成功!")
  484. print(f" 详细信息: {result.details.get('name', 'N/A')}")
  485. # 如果有RAG参考信息,打印出来
  486. if 'rag_reference_source' in result.details:
  487. print(f"\n 📚 RAG参考信息:")
  488. print(f" 参考来源: {result.details['rag_reference_source']}")
  489. print(f" 参考内容长度: {len(result.details.get('rag_review_references', ''))} 字符")
  490. # 打印审查响应(截取前500字符)
  491. response = result.details.get('response', '')
  492. if response:
  493. print(f"\n 📋 审查响应 (前500字符):")
  494. print(f" {response[:500]}...")
  495. else:
  496. print(f" 错误信息: {result.error_message}")
  497. # 输出文件位置
  498. print("\n" + "=" * 80)
  499. print("详细结果已保存到:".center(80))
  500. print(" 📁 temp/entity_bfp_recall/rag_pipeline_data.json - RAG检索完整数据流")
  501. print(" 📁 temp/entity_bfp_recall/enhance_with_parent_docs.json - 父文档增强结果")
  502. print(" 📁 temp/entity_bfp_recall/extract_first_result.json - 最终提取结果")
  503. print(" 📁 temp/entity_bfp_recall/parameter_compliance_full_pipeline.json - 参数检查完整数据流")
  504. print("=" * 80 + "\n")
  505. print("✅ 测试完成!")
  506. # 保存测试结果摘要
  507. os.makedirs(r"temp\entity_bfp_recall", exist_ok=True)
  508. test_summary = {
  509. "test_type": "parameter_compliance",
  510. "check_display_name": "参数合规性检查",
  511. "timestamp": time.time(),
  512. "result": {
  513. 'success': result.success,
  514. 'execution_time': result.execution_time,
  515. 'error_message': result.error_message,
  516. 'details_summary': {
  517. 'name': result.details.get('name'),
  518. 'has_rag_reference': 'rag_reference_source' in result.details,
  519. 'response_length': len(result.details.get('response', '')),
  520. 'response_preview': result.details.get('response', '')[:200]
  521. }
  522. }
  523. }
  524. with open(rf"temp\entity_bfp_recall\test_summary.json", "w", encoding='utf-8') as f:
  525. json.dump(test_summary, f, ensure_ascii=False, indent=4)
  526. return result
  527. if __name__ == "__main__":
  528. # 运行异步主函数
  529. asyncio.run(main())