""" 批量对比测试 — 对5个不同文件执行方案A vs 方案B,输出汇总JSON供分析。 """ import asyncio import json import sys import time from pathlib import Path from collections import defaultdict PROJECT_ROOT = str(Path(__file__).parent.parent.parent) if PROJECT_ROOT not in sys.path: from utils_test.Completeness_Compare_Test.compare_test import ( load_final_result, extract_chunks_by_chapter, get_all_chapter_codes, load_standard_items_for_chapter, run_method_a, compare_results, ) from utils_test.Completeness_Compare_Test.method_b_direct_llm import ( run_direct_llm_check, direct_result_to_dict, ) RESULT_DIR = Path(PROJECT_ROOT) / "temp" / "construction_review" / "final_result" CSV_PATH = ( Path(PROJECT_ROOT) / "core" / "construction_review" / "component" / "doc_worker" / "config" / "StandardCategoryTable.csv" ) def pick_5_distinct_files(): """选出5个不同文件(按hash前缀去重,取最新的)""" files_by_hash = {} for f in sorted(RESULT_DIR.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True): hash_prefix = f.stem.split("-")[0] if hash_prefix not in files_by_hash: files_by_hash[hash_prefix] = f if len(files_by_hash) >= 5: break return list(files_by_hash.values()) async def test_one_file(json_path: Path): """对一个文件的所有章节执行双方案对比""" data = load_final_result(str(json_path)) file_name = data.get("file_name", json_path.stem) chapter_codes = get_all_chapter_codes(data) file_result = { "file_id": json_path.stem, "file_name": file_name, "chapters": [], "summary": {}, } total_a_time = 0 total_b_time = 0 total_agree = 0 total_disagree = 0 total_a_missing = 0 total_b_missing = 0 total_required = 0 for chapter_code in chapter_codes: chunks = extract_chunks_by_chapter(data, chapter_code) if not chunks: continue chapter_name = chunks[0].get("first_name", chapter_code) standard_items = load_standard_items_for_chapter(str(CSV_PATH), chapter_code) if not standard_items: continue # 方案A a_result, a_time, a_llm_calls = await run_method_a( chunks=chunks, csv_path=str(CSV_PATH), chapter_code=chapter_code, ) # 方案B b_result = await run_direct_llm_check( chunks=chunks, standard_items=standard_items, chapter_code=chapter_code, chapter_name=chapter_name, ) # 对比 cr = compare_results( chapter_code=chapter_code, chapter_name=chapter_name, method_a=a_result, method_b=b_result, a_time=a_time, a_llm_calls=a_llm_calls, ) chapter_data = { "chapter_code": chapter_code, "chapter_name": chapter_name, "a_total": cr.a_total_required, "a_missing": cr.a_missing, "a_rate": cr.a_completeness_rate, "a_time": round(a_time, 2), "b_total": cr.b_total_required, "b_missing": cr.b_missing, "b_rate": cr.b_completeness_rate, "b_time": round(b_result.execution_time, 2), "agreement": cr.agreement_count, "disagreement": cr.disagreement_count, "a_only_missing": cr.a_only_missing, "b_only_missing": cr.b_only_missing, } file_result["chapters"].append(chapter_data) total_a_time += a_time total_b_time += b_result.execution_time total_agree += cr.agreement_count total_disagree += cr.disagreement_count total_a_missing += cr.a_missing total_b_missing += cr.b_missing total_required += cr.a_total_required n = len(file_result["chapters"]) file_result["summary"] = { "chapter_count": n, "total_required": total_required, "total_a_missing": total_a_missing, "total_b_missing": total_b_missing, "total_a_time": round(total_a_time, 2), "total_b_time": round(total_b_time, 2), "total_agreement": total_agree, "total_disagreement": total_disagree, "agreement_rate": round(total_agree / (total_agree + total_disagree) * 100, 1) if (total_agree + total_disagree) > 0 else 0, } return file_result async def main(): files = pick_5_distinct_files() print(f"选出 {len(files)} 个文件进行批量测试:") for f in files: print(f" - {f.name}") print() all_results = [] for i, fpath in enumerate(files): print(f"[{i+1}/{len(files)}] 测试: {fpath.name}") t0 = time.time() result = await test_one_file(fpath) result["total_wall_time"] = round(time.time() - t0, 2) all_results.append(result) s = result["summary"] print(f" 完成: {s['chapter_count']}章节, A缺失{s['total_a_missing']}, B缺失{s['total_b_missing']}, " f"一致率{s['agreement_rate']}%, 耗时{result['total_wall_time']}s") print() # 保存 out_path = Path(__file__).parent / "batch_result.json" with open(out_path, "w", encoding="utf-8") as fp: json.dump(all_results, fp, ensure_ascii=False, indent=2) print(f"结果已保存: {out_path}") if __name__ == "__main__": asyncio.run(main())