| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- """
- 输出实际审查结果 — 将 RAG 管线的真实检索内容写入可读文件
- """
- import sys, os, json
- project_root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- if project_root not in sys.path:
- sys.path.insert(0, project_root)
- from utils_test.RAG_Pipeline_Test.test_data import TEST_SAMPLES
- from utils_test.RAG_Pipeline_Test.rag_pipeline_runner import RAGPipelineRunner
- def main():
- runner = RAGPipelineRunner()
- output_lines = []
- # 选取 3 个代表性样本深入分析
- target_ids = ["safety_Monitoring", "technology_Operations", "safety_Emergency"]
- samples = [s for s in TEST_SAMPLES if s["chunk_id"] in target_ids]
- for sample in samples:
- chunk_id = sample["chunk_id"]
- content = sample["content"]
- output_lines.append("=" * 80)
- output_lines.append(f"测试样本: {chunk_id}")
- output_lines.append(f"章节: {sample['section_label']}")
- output_lines.append("=" * 80)
- # 截取原文前 500 字
- output_lines.append(f"\n【原文摘录 (前500字)】")
- output_lines.append(content[:500])
- output_lines.append("...")
- # 执行管线
- result = runner.run_single(content, chunk_id=chunk_id)
- # 审查要点
- output_lines.append(f"\n{'─' * 60}")
- output_lines.append(f"【审查要点提取结果】共 {result.review_point_count} 个要点, 耗时 {result.extract_time:.1f}s")
- output_lines.append(f"{'─' * 60}")
- if not result.review_points:
- output_lines.append(" (提取失败)")
- continue
- for i, rp in enumerate(result.review_points):
- label = rp.get('label', rp.get('entity', ''))
- queries = rp.get('search_queries', rp.get('search_keywords', []))
- original = rp.get('original_text', rp.get('background', ''))
- parameter = rp.get('parameter', '')
- output_lines.append(f"\n 要点 [{i+1}] {label}")
- output_lines.append(f" ├─ 检索语句:")
- for q in queries:
- output_lines.append(f" │ • {q}")
- output_lines.append(f" ├─ 原文摘录: {original}")
- output_lines.append(f" └─ 技术参数: {parameter}")
- # 检索结果
- output_lines.append(f"\n{'─' * 60}")
- output_lines.append(f"【RAG 检索结果】{result.non_empty_pairs}/{result.review_point_count} 个查询对有结果, 耗时 {result.retrieval_time:.1f}s")
- output_lines.append(f"{'─' * 60}")
- if not result.retrieval_results:
- output_lines.append(" (检索失败)")
- continue
- for i, (rp, results) in enumerate(zip(result.review_points, result.retrieval_results)):
- label = rp.get('label', rp.get('entity', ''))
- output_lines.append(f"\n ── 要点 [{i+1}] {label} 的检索结果 ──")
- if not results:
- output_lines.append(" (无结果)")
- continue
- for j, r in enumerate(results[:3]): # 只显示 top-3
- rerank = r.get('rerank_score', 0)
- bfp = r.get('bfp_rerank_score', 0)
- text = r.get('text_content', '')[:200]
- meta = r.get('metadata', {})
- file_name = meta.get('file_name', '')
- parent_id = meta.get('parent_id', '')
- output_lines.append(f"\n [{j+1}] rerank={rerank:.4f}, bfp_rerank={bfp:.4f}")
- if file_name:
- output_lines.append(f" 来源: {file_name}")
- output_lines.append(f" 内容: {text}...")
- output_lines.append("\n\n")
- # 写入文件
- output_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "reports", "review_results_detail.txt")
- os.makedirs(os.path.dirname(output_path), exist_ok=True)
- with open(output_path, "w", encoding="utf-8") as f:
- f.write("\n".join(output_lines))
- print(f"审查结果已输出: {output_path}")
- if __name__ == "__main__":
- main()
|