""" 完整性审查对比测试 对比两种方案: 方案A(当前):先分类(LLM三级分类) → 再用集合运算判断完整性 → LLM生成建议 方案B(替代):直接LLM解释,一次性输出是否符合 + 证据 + 原因 评估维度: 1. 准确率:与人工标注对比的覆盖率/误报率 2. 一致性:相同输入多次运行的结果稳定性 3. 延迟:端到端耗时 4. 质量:建议的可操作性、证据的准确性 5. 成本:LLM调用次数、token消耗 使用方法: cd 项目根目录 $env:PYTHONPATH = (Get-Location) python utils_test/Completeness_Compare_Test/compare_test.py """ import asyncio import json import sys import time from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from dataclasses import dataclass, field PROJECT_ROOT = str(Path(__file__).parent.parent.parent) if PROJECT_ROOT not in sys.path: from foundation.observability.logger.loggering import review_logger as logger from utils_test.Completeness_Compare_Test.method_b_direct_llm import ( run_direct_llm_check, direct_result_to_dict, DirectCheckResult, ) # ───────────────────────────────────────────────────────────────────────────── # 数据加载 # ───────────────────────────────────────────────────────────────────────────── def load_final_result(json_path: str) -> Dict[str, Any]: """加载 final_result JSON""" with open(json_path, "r", encoding="utf-8") as f: return json.load(f) def extract_chunks_by_chapter( data: Dict[str, Any], chapter_code: str ) -> List[Dict[str, Any]]: """从 final_result 中提取指定一级章节的所有 chunks""" structured = data.get("document_result", {}).get("structured_content", {}) all_chunks = structured.get("chunks", []) return [ c for c in all_chunks if c.get("chapter_classification") == chapter_code ] def get_all_chapter_codes(data: Dict[str, Any]) -> List[str]: """获取文档中所有一级章节代码""" structured = data.get("document_result", {}).get("structured_content", {}) all_chunks = structured.get("chunks", []) codes = [] seen = set() for c in all_chunks: code = c.get("chapter_classification", "") if code and code not in seen and code not in ("quality_check", "catalog", "metadata"): codes.append(code) seen.add(code) return codes def load_standard_items_for_chapter( csv_path: str, chapter_code: str ) -> List[Dict[str, Any]]: """从 StandardCategoryTable.csv 加载指定章节的标准三级项""" import pandas as pd encodings = ['utf-8-sig', 'utf-16', 'gbk', 'utf-8'] df = None for enc in encodings: try: df = pd.read_csv(csv_path, encoding=enc, sep=None, engine='python') break except UnicodeDecodeError: continue if df is None: raise ValueError(f"无法读取CSV: {csv_path}") df.columns = [c.strip().lower().replace(' ', '_') for c in df.columns] items = [] for _, row in df.iterrows(): fc = str(row.get('first_code', '')).strip() if fc != chapter_code: continue items.append({ "first_code": fc, "first_name": str(row.get('first_name', '')).strip(), "second_code": str(row.get('second_code', '')).strip(), "second_name": str(row.get('second_name', '')).strip(), "third_code": str(row.get('third_code', '')).strip(), "third_name": str(row.get('third_name', '')).strip(), "third_focus": str(row.get('third_focus', '')).strip(), }) return items # ───────────────────────────────────────────────────────────────────────────── # 方案A:当前系统(复用 LightweightCompletenessChecker) # ───────────────────────────────────────────────────────────────────────────── async def run_method_a( chunks: List[Dict[str, Any]], csv_path: str, chapter_code: str, outline: Optional[List[Dict]] = None, model_client=None, ) -> Tuple[Dict[str, Any], float, int]: """ 执行方案A:先分类再比对 Returns: (result_dict, execution_time, llm_call_count) """ from core.construction_review.component.reviewers.completeness_reviewer import ( LightweightCompletenessChecker, result_to_dict, ) start_time = time.time() checker = LightweightCompletenessChecker(csv_path, model_client=model_client) result = await checker.check( chunks=chunks, outline=outline, chapter_classification=chapter_code if chapter_code != "all" else None, ) result_dict = result_to_dict(result) execution_time = time.time() - start_time # 方案A的LLM调用:仅在生成建议时调用(issue_point/reason是模板拼接) # 统计recommendations中使用LLM的数量 llm_call_count = len([ r for r in result_dict.get("recommendations", []) if r.get("level") != "通过" ]) return result_dict, execution_time, llm_call_count # ───────────────────────────────────────────────────────────────────────────── # 对比评估 # ───────────────────────────────────────────────────────────────────────────── @dataclass class CompareResult: """单章节对比结果""" chapter_code: str chapter_name: str # 方案A结果 a_total_required: int = 0 a_present: int = 0 a_missing: int = 0 a_completeness_rate: float = 0.0 a_execution_time: float = 0.0 a_llm_calls: int = 0 a_missing_details: List[Dict] = field(default_factory=list) a_recommendations: List[Dict] = field(default_factory=list) # 方案B结果 b_total_required: int = 0 b_covered: int = 0 b_missing: int = 0 b_completeness_rate: float = 0.0 b_execution_time: float = 0.0 b_llm_calls: int = 0 b_items: List[Dict] = field(default_factory=list) # 差异分析 agreement_count: int = 0 # 两方案判断一致的数量 disagreement_count: int = 0 # 两方案判断不一致的数量 a_only_missing: List[str] = field(default_factory=list) # A认为缺失但B认为覆盖 b_only_missing: List[str] = field(default_factory=list) # B认为缺失但A认为覆盖 def compare_results( chapter_code: str, chapter_name: str, method_a: Dict[str, Any], method_b: DirectCheckResult, a_time: float, a_llm_calls: int, ) -> CompareResult: """对比两种方案的结果""" cr = CompareResult(chapter_code=chapter_code, chapter_name=chapter_name) # 方案A统计 tertiary = method_a.get("tertiary_completeness", {}) cr.a_total_required = tertiary.get("total", 0) cr.a_present = tertiary.get("present", 0) cr.a_missing = tertiary.get("missing", 0) rate_str = tertiary.get("completeness_rate", "0%").rstrip("%") try: cr.a_completeness_rate = float(rate_str) except ValueError: cr.a_completeness_rate = 0.0 cr.a_execution_time = a_time cr.a_llm_calls = a_llm_calls cr.a_missing_details = tertiary.get("missing_details", []) cr.a_recommendations = method_a.get("recommendations", []) # 方案B统计 cr.b_total_required = method_b.total_required cr.b_covered = method_b.covered_count cr.b_missing = method_b.missing_count cr.b_completeness_rate = method_b.completeness_rate cr.b_execution_time = method_b.execution_time cr.b_llm_calls = method_b.llm_call_count cr.b_items = [ { "standard_code": item.standard_code, "standard_name": item.standard_name, "is_covered": item.is_covered, "evidence": item.evidence, "reason": item.reason, "confidence": item.confidence, } for item in method_b.items ] # 差异分析 a_missing_codes = {d["tertiary_code"] for d in cr.a_missing_details} b_missing_codes = { item.standard_code for item in method_b.items if not item.is_covered } cr.a_only_missing = sorted(a_missing_codes - b_missing_codes) cr.b_only_missing = sorted(b_missing_codes - a_missing_codes) all_codes = a_missing_codes | b_missing_codes cr.disagreement_count = len(cr.a_only_missing) + len(cr.b_only_missing) cr.agreement_count = ( cr.a_total_required - len(a_missing_codes) - len(cr.b_only_missing) ) return cr # ───────────────────────────────────────────────────────────────────────────── # 报告生成 # ───────────────────────────────────────────────────────────────────────────── def generate_report( compare_results: List[CompareResult], output_path: str ) -> str: """生成对比报告""" lines = [] lines.append("=" * 80) lines.append("完整性审查方案对比报告") lines.append("=" * 80) lines.append(f"生成时间: {time.strftime('%Y-%m-%d %H:%M:%S')}") lines.append(f"对比章节数: {len(compare_results)}") lines.append("") # ── 总览表 ── lines.append("─" * 80) lines.append("【总览】") lines.append("─" * 80) lines.append(f"{'章节':<15} {'A完整率':>8} {'B完整率':>8} {'A耗时':>8} {'B耗时':>8} " f" {'A缺失':>6} {'B缺失':>6} {'一致':>6} {'分歧':>6}") lines.append("─" * 80) total_a_time = 0 total_b_time = 0 total_agree = 0 total_disagree = 0 for cr in compare_results: lines.append( f"{cr.chapter_code:<15} " f"{cr.a_completeness_rate:>7.1f}% " f"{cr.b_completeness_rate:>7.1f}% " f"{cr.a_execution_time:>7.2f}s " f"{cr.b_execution_time:>7.2f}s " f"{cr.a_missing:>6} " f"{cr.b_missing:>6} " f"{cr.agreement_count:>6} " f"{cr.disagreement_count:>6}" ) total_a_time += cr.a_execution_time total_b_time += cr.b_execution_time total_agree += cr.agreement_count total_disagree += cr.disagreement_count lines.append("─" * 80) lines.append( f"{'合计':<15} {'':>8} {'':>8} " f"{total_a_time:>7.2f}s " f"{total_b_time:>7.2f}s " f"{'':>6} {'':>6} " f"{total_agree:>6} " f"{total_disagree:>6}" ) lines.append("") # ── 详细对比 ── for cr in compare_results: lines.append("─" * 80) lines.append(f"【{cr.chapter_code}】{cr.chapter_name}") lines.append("─" * 80) lines.append(f" 方案A: 缺失 {cr.a_missing}/{cr.a_total_required} " f"({cr.a_completeness_rate:.1f}%), 耗时 {cr.a_execution_time:.2f}s, " f"LLM调用 {cr.a_llm_calls}次") lines.append(f" 方案B: 缺失 {cr.b_missing}/{cr.b_total_required} " f"({cr.b_completeness_rate:.1f}%), 耗时 {cr.b_execution_time:.2f}s, " f"LLM调用 {cr.b_llm_calls}次") lines.append("") # 方案A缺失项 if cr.a_missing_details: lines.append(" 方案A认为缺失:") for d in cr.a_missing_details[:10]: lines.append(f" - [{d.get('tertiary_code', '')}] {d.get('tertiary_name', '')}") if len(cr.a_missing_details) > 10: lines.append(f" ... 共 {len(cr.a_missing_details)} 项") lines.append("") # 方案B缺失项 b_missing_items = [item for item in cr.b_items if not item["is_covered"]] if b_missing_items: lines.append(" 方案B认为缺失:") for item in b_missing_items[:10]: lines.append(f" - [{item['standard_code']}] {item['standard_name']}") lines.append(f" 原因: {item['reason'][:80]}") if len(b_missing_items) > 10: lines.append(f" ... 共 {len(b_missing_items)} 项") lines.append("") # 分歧项 if cr.a_only_missing: lines.append(" 分歧:仅方案A认为缺失(B认为已覆盖):") for code in cr.a_only_missing[:5]: b_item = next((i for i in cr.b_items if i["standard_code"] == code), None) if b_item: lines.append(f" - [{code}] {b_item['standard_name']}") lines.append(f" B的证据: {b_item['evidence'][:100]}") lines.append("") if cr.b_only_missing: lines.append(" 分歧:仅方案B认为缺失(A认为已覆盖):") for code in cr.b_only_missing[:5]: b_item = next((i for i in cr.b_items if i["standard_code"] == code), None) if b_item: lines.append(f" - [{code}] {b_item['standard_name']}") lines.append(f" B的原因: {b_item['reason'][:100]}") lines.append("") # ── 结论 ── lines.append("=" * 80) lines.append("【对比结论】") lines.append("=" * 80) lines.append(f" 总判断一致数: {total_agree}") lines.append(f" 总分歧数: {total_disagree}") if total_agree + total_disagree > 0: agree_rate = total_agree / (total_agree + total_disagree) * 100 lines.append(f" 一致率: {agree_rate:.1f}%") lines.append(f" 方案A总耗时: {total_a_time:.2f}s") lines.append(f" 方案B总耗时: {total_b_time:.2f}s") lines.append("") report_text = "\n".join(lines) # 写入文件 with open(output_path, "w", encoding="utf-8") as f: f.write(report_text) return report_text # ───────────────────────────────────────────────────────────────────────────── # 主测试流程 # ───────────────────────────────────────────────────────────────────────────── async def run_comparison( json_path: str, csv_path: str, chapter_codes: Optional[List[str]] = None, output_dir: Optional[str] = None, ): """ 执行完整对比测试 Args: json_path: final_result JSON 文件路径 csv_path: StandardCategoryTable.csv 路径 chapter_codes: 要测试的章节代码列表,None则测试所有章节 output_dir: 输出目录 """ logger.info("=" * 60) logger.info("开始完整性审查方案对比测试") logger.info("=" * 60) # 加载数据 data = load_final_result(json_path) file_name = data.get("file_name", "unknown") logger.info(f"测试文件: {file_name}") # 确定测试章节 if chapter_codes is None: chapter_codes = get_all_chapter_codes(data) logger.info(f"测试章节: {chapter_codes}") # 输出目录 if output_dir is None: output_dir = str(Path(__file__).parent / "output") Path(output_dir).mkdir(parents=True, exist_ok=True) # 逐章节对比 all_results: List[CompareResult] = [] for chapter_code in chapter_codes: logger.info(f"\n{'─' * 40}") logger.info(f"测试章节: {chapter_code}") logger.info(f"{'─' * 40}") chunks = extract_chunks_by_chapter(data, chapter_code) if not chunks: logger.warning(f"章节 {chapter_code} 无chunks,跳过") continue chapter_name = chunks[0].get("first_name", chapter_code) logger.info(f" chunks数量: {len(chunks)}") # 加载标准要求 standard_items = load_standard_items_for_chapter(csv_path, chapter_code) logger.info(f" 标准要求项数: {len(standard_items)}") if not standard_items: logger.warning(f"章节 {chapter_code} 无标准要求,跳过") continue # ── 方案A ── logger.info(" [方案A] 执行轻量级完整性审查...") a_result, a_time, a_llm_calls = await run_method_a( chunks=chunks, csv_path=csv_path, chapter_code=chapter_code, ) logger.info(f" [方案A] 完成: 缺失{a_result.get('tertiary_completeness', {}).get('missing', 0)}项, " f"耗时{a_time:.2f}s") # ── 方案B ── logger.info(" [方案B] 执行直接LLM审查...") b_result = await run_direct_llm_check( chunks=chunks, standard_items=standard_items, chapter_code=chapter_code, chapter_name=chapter_name, ) logger.info(f" [方案B] 完成: 缺失{b_result.missing_count}项, " f"耗时{b_result.execution_time:.2f}s") # ── 对比 ── cr = compare_results( chapter_code=chapter_code, chapter_name=chapter_name, method_a=a_result, method_b=b_result, a_time=a_time, a_llm_calls=a_llm_calls, ) all_results.append(cr) # 保存单章节详细结果 detail_path = Path(output_dir) / f"detail_{chapter_code}.json" with open(detail_path, "w", encoding="utf-8") as f: json.dump({ "chapter_code": chapter_code, "chapter_name": chapter_name, "method_a": a_result, "method_b": direct_result_to_dict(b_result), "comparison": { "a_only_missing": cr.a_only_missing, "b_only_missing": cr.b_only_missing, "agreement_count": cr.agreement_count, "disagreement_count": cr.disagreement_count, } }, f, ensure_ascii=False, indent=2) # 生成报告 report_path = Path(output_dir) / "comparison_report.txt" report_text = generate_report(all_results, str(report_path)) logger.info(f"\n报告已保存: {report_path}") # 保存汇总JSON summary_path = Path(output_dir) / "comparison_summary.json" with open(summary_path, "w", encoding="utf-8") as f: json.dump({ "file_name": file_name, "chapter_count": len(all_results), "chapters": [ { "code": cr.chapter_code, "name": cr.chapter_name, "a_missing": cr.a_missing, "b_missing": cr.b_missing, "a_time": round(cr.a_execution_time, 2), "b_time": round(cr.b_execution_time, 2), "agreement": cr.agreement_count, "disagreement": cr.disagreement_count, "a_only_missing": cr.a_only_missing, "b_only_missing": cr.b_only_missing, } for cr in all_results ] }, f, ensure_ascii=False, indent=2) # 打印报告 print(report_text) return all_results # ───────────────────────────────────────────────────────────────────────────── # 入口 # ───────────────────────────────────────────────────────────────────────────── if __name__ == "__main__": # 默认使用最新的测试数据 RESULT_DIR = Path(PROJECT_ROOT) / "temp" / "construction_review" / "final_result" CSV_PATH = Path(PROJECT_ROOT) / "core" / "construction_review" / "component" / "doc_worker" / "config" / "StandardCategoryTable.csv" # 选择一个测试文件(最新的) result_files = sorted(RESULT_DIR.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True) if not result_files: print("未找到测试数据文件") sys.exit(1) test_file = result_files[0] print(f"使用测试文件: {test_file.name}") # 可以指定章节,None表示全部 # test_chapters = ["basis", "overview", "technology"] test_chapters = None # 全部章节 asyncio.run(run_comparison( json_path=str(test_file), csv_path=str(CSV_PATH), chapter_codes=test_chapters, ))