"""
完整性审查对比测试 - FastAPI后端
提供API端点:
GET /api/compare/files — 列出可用测试文件
POST /api/compare/chapters — 获取文件的章节列表
POST /api/compare/run — 执行测试(SSE流式返回)
GET / — 返回前端页面
"""
import asyncio
import io
import json
import sys
import time
import uuid
import zipfile
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
PROJECT_ROOT = str(Path(__file__).parent.parent.parent)
if PROJECT_ROOT not in sys.path:
from utils_test.Completeness_Compare_Test.compare_test import (
extract_chunks_by_chapter,
get_all_chapter_codes,
load_final_result,
load_standard_items_for_chapter,
run_method_a,
compare_results,
)
from utils_test.Completeness_Compare_Test.method_b_direct_llm import (
run_direct_llm_check,
direct_result_to_dict,
)
# ── 路径常量 ──
RESULT_DIR = Path(PROJECT_ROOT) / "temp" / "construction_review" / "final_result"
CSV_PATH = (
Path(PROJECT_ROOT)
/ "core"
/ "construction_review"
/ "component"
/ "doc_worker"
/ "config"
/ "StandardCategoryTable.csv"
)
HTML_PATH = Path(__file__).parent / "index.html"
# ── FastAPI 应用 ──
app = FastAPI(title="完整性审查对比测试")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ── 工具函数 ──
def _format_sse(event: str, data: Any) -> str:
"""格式化SSE事件"""
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
def _find_file(file_id: str) -> Optional[Path]:
"""根据文件ID(不含.json)找到完整路径"""
for f in RESULT_DIR.glob("*.json"):
if f.stem == file_id:
return f
return None
def _pick_5_distinct_files() -> List[Path]:
"""选出5个不同文件(按hash前缀+文件名双重去重),过滤章节数<3的残缺文件"""
files_by_hash = {}
seen_names = set()
for f in sorted(RESULT_DIR.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
hash_prefix = f.stem.split("-")[0]
if hash_prefix in files_by_hash:
continue
try:
data = load_final_result(str(f))
codes = get_all_chapter_codes(data)
if len(codes) < 3:
continue
fname = data.get("file_name", "")
if fname in seen_names:
continue
seen_names.add(fname)
except Exception:
continue
files_by_hash[hash_prefix] = f
if len(files_by_hash) >= 5:
break
return list(files_by_hash.values())
def _make_zip_response(html_content: str, zip_filename: str) -> StreamingResponse:
"""将HTML内容打包为ZIP并返回"""
from urllib.parse import quote
buf = io.BytesIO()
html_name = zip_filename.replace(".zip", ".html")
with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr(html_name, html_content.encode("utf-8"))
buf.seek(0)
encoded = quote(zip_filename, safe="")
return StreamingResponse(
iter([buf.getvalue()]),
media_type="application/zip",
headers={
"Content-Disposition": (
f"attachment; filename=report.zip; filename*=UTF-8''{encoded}"
),
"Access-Control-Allow-Origin": "*",
},
)
def _gen_report_html(
chapters: List[Dict], summary: Dict, file_name: str, mode: str
) -> str:
"""生成单文件测试的HTML报告"""
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
css = _report_css()
# ── 统计卡片 ──
stats = ""
stats += f'
{summary.get("total_chapters", len(chapters))}
测试章节数
'
stats += f'{summary.get("total_time", 0)}s
总耗时
'
if summary.get("method_a"):
ma = summary["method_a"]
stats += f'{ma["total_time"]}s
方案A总耗时
'
stats += f'{ma["total_missing"]}
方案A总缺失
'
if summary.get("method_b"):
mb = summary["method_b"]
stats += f'{mb["total_time"]}s
方案B总耗时
'
stats += f'{mb["total_missing"]}
方案B总缺失
'
if summary.get("comparison"):
c = summary["comparison"]
stats += f'{c["agreement_rate"]}%
一致率
'
stats += f'{c["total_agreement"]}
一致项
'
stats += f'{c["total_disagreement"]}
分歧项
'
# ── 对比表格 ──
thead = "章节 | 方案A缺失 | 方案A完整率 | 方案A耗时 | 方案B缺失 | 方案B完整率 | 方案B耗时 | 一致 | 分歧 | "
rows = ""
for r in chapters:
c = r.get("comparison", {})
mb = r.get("method_b", {})
if not c:
continue
rows += f"""
| {r['chapter_code']} {r.get('chapter_name','')} |
{c.get('a_missing','')} |
{c.get('a_rate',0):.1f}% |
{c.get('a_time','')}s |
{c.get('b_missing','')} |
{c.get('b_rate',0):.1f}% |
{mb.get('execution_time','')}s |
{c.get('agreement','')} |
{c.get('disagreement','')} |
"""
# ── 差异分析 ──
diff = ""
for r in chapters:
c = r.get("comparison", {})
if not c or (not c.get("a_only_missing") and not c.get("b_only_missing")):
continue
nm = r.get("code_name_map", {})
diff += f'{r["chapter_code"]} - {r.get("chapter_name","")}
'
if c.get("a_only_missing"):
names_a = ", ".join(nm.get(x, x) for x in c["a_only_missing"])
diff += f'''
仅方案A认为缺失(方案B认为已覆盖):{len(c["a_only_missing"])}项
{names_a}
'''
if c.get("b_only_missing"):
names_b = ", ".join(nm.get(x, x) for x in c["b_only_missing"])
diff += f'''
仅方案B认为缺失(方案A认为已覆盖):{len(c["b_only_missing"])}项
{names_b}
'''
if not diff:
diff = '无分歧项,两种方案判断完全一致
'
# ── 章节详情 ──
details = ""
for r in chapters:
details += f'{r["chapter_code"]} - {r.get("chapter_name","")}
'
# 方案A
ma = r.get("method_a", {})
if ma:
recs = ma.get("result", {}).get("recommendations", [])
pass_rec = next((rec for rec in recs if rec.get("level") == "通过"), None)
issue_recs = [rec for rec in recs if rec.get("level") != "通过"]
details += '方案A(先分类再比对)
'
if pass_rec:
details += f'{pass_rec.get("issue_point","")}
'
for rec in issue_recs:
details += f'''
[{rec.get("level","")}] {rec.get("issue_point","")}
位置: {rec.get("location","-")}
{f'
建议: {rec["suggestion"]}
' if rec.get("suggestion") else ""}
{f'
依据: {rec["reason"]}
' if rec.get("reason") else ""}
'''
# 方案B
mb = r.get("method_b", {})
if mb:
details += '方案B(直接LLM解释)
'
items = mb.get("items", [])
covered = [i for i in items if i.get("is_covered")]
missing = [i for i in items if not i.get("is_covered")]
for item in missing:
details += f'''
缺失 {item.get("standard_name","")} ({item.get("standard_code","")})
原因: {item.get("reason","-")}
置信度: {int((item.get("confidence",0) or 0)*100)}%
'''
if covered:
details += f'已覆盖 {len(covered)} 项:
'
for item in covered[:5]:
ev = (item.get("evidence","") or "")[:120]
details += f'''
覆盖 {item.get("standard_name","")}
{f'
证据: {ev}...
' if ev else ""}
'''
if len(covered) > 5:
details += f'... 还有 {len(covered)-5} 项
'
mode_label = {"compare": "双方案对比", "method_a": "仅方案A", "method_b": "仅方案B"}.get(mode, mode)
return f"""
完整性审查对比报告 - {file_name}
"""
def _report_css() -> str:
"""报告专用CSS(内联,支持打印)"""
return """
*{margin:0;padding:0;box-sizing:border-box}
body{font-family:-apple-system,BlinkMacSystemFont,"Segoe UI",Roboto,"Helvetica Neue",Arial,sans-serif;background:#f5f7fa;color:#333;line-height:1.6;font-size:14px}
.container{max-width:1100px;margin:0 auto;padding:20px}
header{background:linear-gradient(135deg,#667eea 0%,#764ba2 100%);color:#fff;padding:20px;border-radius:10px;margin-bottom:16px}
header h1{font-size:22px;margin-bottom:4px}
header p{opacity:.9;font-size:12px}
.panel{background:#fff;border-radius:10px;padding:16px;margin-bottom:14px;box-shadow:0 1px 4px rgba(0,0,0,.04);break-inside:avoid}
.panel h2{font-size:15px;margin-bottom:10px;padding-bottom:8px;border-bottom:2px solid #f0f0f0}
.stats-grid{display:grid;grid-template-columns:repeat(auto-fit,minmax(140px,1fr));gap:10px;margin-bottom:12px}
.stat-card{background:linear-gradient(135deg,#f8f9ff,#f0f2ff);border-radius:8px;padding:12px;text-align:center;border:1px solid #e8ecff}
.stat-card .value{font-size:24px;font-weight:700;color:#667eea}
.stat-card .label{font-size:11px;color:#888;margin-top:2px}
.stat-card.green .value{color:#22c55e}
.stat-card.red .value{color:#ef4444}
.stat-card.orange .value{color:#f59e0b}
table{width:100%;border-collapse:collapse;font-size:12px}
th,td{padding:8px 10px;text-align:left;border-bottom:1px solid #f0f0f0}
th{background:#f8f9fa;font-weight:600;color:#555}
tr:hover{background:#fafbff}
.badge{display:inline-block;padding:1px 6px;border-radius:8px;font-size:10px;font-weight:600}
.badge-green{background:#dcfce7;color:#16a34a}
.badge-red{background:#fee2e2;color:#dc2626}
.badge-blue{background:#dbeafe;color:#2563eb}
.badge-orange{background:#fef3c7;color:#d97706}
.diff-item{padding:8px 10px;border-radius:6px;margin-bottom:6px;font-size:12px}
.diff-item.a-only{background:#fef2f2;border-left:3px solid #ef4444}
.diff-item.b-only{background:#fff7ed;border-left:3px solid #f59e0b}
.diff-item .code{font-family:monospace;font-weight:600;font-size:11px}
.item-row{padding:6px 10px;border-radius:5px;margin-bottom:4px;font-size:12px;background:#f8f9fa}
.item-row .meta{font-size:10px;color:#888;margin-top:1px}
.empty-state{text-align:center;padding:30px;color:#aaa;font-size:13px}
@media print {
body{background:#fff}
.panel{box-shadow:none;border:1px solid #e0e0e0}
header{background:#667eea!important;-webkit-print-color-adjust:exact;print-color-adjust:exact}
}
"""
def _gen_conclusion(
files_data: List[Dict], avg_rate: float,
total_a_m: int, total_b_m: int,
total_a_t: float, total_b_t: float,
) -> str:
"""根据批量数据生成AI对比分析结论(面向业务人员)"""
from collections import Counter
a_only_all = []
b_only_all = []
for f in files_data:
nm = f.get("code_name_map", {})
for c in f.get("chapters", []):
for code in c.get("a_only_missing", []):
a_only_all.append(nm.get(code, code))
for code in c.get("b_only_missing", []):
b_only_all.append(nm.get(code, code))
top_a = Counter(a_only_all).most_common(3)
top_b = Counter(b_only_all).most_common(3)
speedup = round(total_b_t / max(total_a_t, 0.1), 1)
if total_a_m > total_b_m:
diff_text = f"方案A比方案B多报出 {total_a_m - total_b_m} 项缺失"
elif total_b_m > total_a_m:
diff_text = f"方案B比方案A多报出 {total_b_m - total_a_m} 项缺失"
else:
diff_text = "两种方案报出的缺失总数相近"
avg_rate_text = "高" if avg_rate >= 90 else "中等" if avg_rate >= 75 else "一般"
parts = []
# 结论总述
parts.append(f"""
AI 对比分析结论
对 {len(files_data)} 份施工方案文档进行双方案完整性审查对比,两方案判断 一致率约 {avg_rate}%({avg_rate_text}一致性)。{diff_text}。
""")
# 两方案特点对比
parts.append(f"""
两方案特点对比
| 维度 | 方案A:先分类再比对 | 方案B:直接LLM解释 |
| 审查逻辑 |
先由分类器将文档内容归类到标准代码,再用集合运算判断是否覆盖。分类器漏分则误判缺失。 |
将文档原文和标准要求一起交给LLM,LLM逐条判断,并给出证据原文和判断理由。 |
| 可解释性 |
较弱。输出为模板字符串拼接,审查人员无法直接看到判断依据,需回溯分类链路。 |
强。每条判断都附带文档原文引用和具体理由,审查人员可直接验证,无需追溯中间过程。 |
| 漏报风险 |
分类器对组织架构类、人员职责类标准项召回率偏低,有内容也可能误判缺失。 |
基于语义理解,不受分类器限制。但对细节参数(预警值、监测频率)可能过度严格。 |
| 误报风险 |
低。集合运算确定性高,分类正确则判断正确。 |
中等。LLM判断有随机性,同文档多次运行可能略有差异。 |
| 速度 |
快(~1s/章节),大部分运算为集合操作。 |
慢(~10s/章节),每章节需完整LLM推理。并发可改善。 |
| 扩展性 |
差。新增标准需调整分类器。 |
好。新增标准只改CSV和prompt。 |
| 客户理解成本 |
链路绕,解释困难。 |
直观:文档+标准→AI判断→证据+结论,一句话讲清楚。 |
""")
# 分歧模式分析
parts.append('分歧模式分析
')
if top_a:
a_items = "、".join(f"{name}({cnt}次)" for name, cnt in top_a)
parts.append(f"""
方案A反复漏报(B认为已覆盖):{a_items}
→ 多为组织架构/人员职责类标准,分类器召回率偏低。方案B能通过语义理解正确识别。
""")
if top_b:
b_items = "、".join(f"{name}({cnt}次)" for name, cnt in top_b)
parts.append(f"""
方案B反复报告(A认为已覆盖):{b_items}
→ 多为预警值/监测频率等细节参数类标准。B对此判断更严格,需人工确认。
""")
if not top_a and not top_b:
parts.append('
两方案分歧较为分散,未出现系统性高频分歧项。
')
parts.append('
')
# 建议
parts.append(f"""
建议
- 建议采用方案B作为主方案。核心优势在可解释性:每条判断有证据原文和推理理由,客户沟通直观,审查人员可直接验证。
- 融合方案:B的判断为主,A的分类结果作上下文增强,帮助LLM更准确定位文档内容。
- 校准分歧:对高频分歧项人工抽查2-3个章节原文,确认哪方更准确,据此调整prompt或分类器。
- 性能:B每章节~{speedup}x于A(串行),已通过并发改善。后续可缓存LLM结果。
""")
return "".join(parts)
# ── 端点:首页 ──
@app.get("/", response_class=HTMLResponse)
async def index():
return HTML_PATH.read_text(encoding="utf-8")
# ── 端点:列出文件 ──
@app.get("/api/compare/files")
async def list_files():
files = []
for f in sorted(RESULT_DIR.glob("*.json"), key=lambda p: p.stat().st_mtime, reverse=True):
try:
with open(f, "r", encoding="utf-8") as fh:
data = json.load(fh)
files.append({
"file_id": f.stem,
"file_name": data.get("file_name", f.name),
"chunks_count": len(
data.get("document_result", {})
.get("structured_content", {})
.get("chunks", [])
),
})
except Exception:
continue
return JSONResponse(content={"files": files})
# ── 端点:获取章节列表 ──
@app.post("/api/compare/chapters")
async def get_chapters(request: Request):
body = await request.json()
file_id = body.get("file_id", "")
fpath = _find_file(file_id)
if not fpath:
return JSONResponse(status_code=404, content={"error": "文件不存在"})
data = load_final_result(str(fpath))
codes = get_all_chapter_codes(data)
chapters = []
for code in codes:
chunks = extract_chunks_by_chapter(data, code)
name = chunks[0].get("first_name", code) if chunks else code
chapters.append({
"code": code,
"name": name,
"chunks_count": len(chunks),
})
return JSONResponse(content={"chapters": chapters})
# ── 端点:执行测试(SSE) ──
@app.post("/api/compare/run")
async def run_test(request: Request):
body = await request.json()
file_id = body.get("file_id", "")
chapters = body.get("chapters", [])
mode = body.get("mode", "compare") # method_a | method_b | compare
fpath = _find_file(file_id)
if not fpath:
return JSONResponse(status_code=404, content={"error": "文件不存在"})
async def event_stream():
try:
data = load_final_result(str(fpath))
file_name = data.get("file_name", file_id)
# 如果未指定章节,使用全部
if not chapters:
chapter_codes = get_all_chapter_codes(data)
else:
chapter_codes = chapters
total = len(chapter_codes)
all_results = []
start_all = time.time()
yield _format_sse("started", {
"file_name": file_name,
"total_chapters": total,
"mode": mode,
})
for idx, chapter_code in enumerate(chapter_codes):
chunks = extract_chunks_by_chapter(data, chapter_code)
if not chunks:
yield _format_sse("progress", {
"chapter": chapter_code,
"status": "skipped",
"reason": "无chunks",
"current": idx + 1,
"total": total,
})
continue
chapter_name = chunks[0].get("first_name", chapter_code)
standard_items = load_standard_items_for_chapter(
str(CSV_PATH), chapter_code
)
chapter_result = {
"chapter_code": chapter_code,
"chapter_name": chapter_name,
"mode": mode,
"code_name_map": {si["third_code"]: si["third_name"] for si in standard_items},
}
# ── 方案A ──
if mode in ("method_a", "compare"):
yield _format_sse("progress", {
"chapter": chapter_code,
"chapter_name": chapter_name,
"status": "running",
"method": "A",
"current": idx + 1,
"total": total,
})
a_result, a_time, a_llm_calls = await run_method_a(
chunks=chunks,
csv_path=str(CSV_PATH),
chapter_code=chapter_code,
)
chapter_result["method_a"] = {
"result": a_result,
"time": round(a_time, 2),
"llm_calls": a_llm_calls,
}
# ── 方案B ──
if mode in ("method_b", "compare"):
yield _format_sse("progress", {
"chapter": chapter_code,
"chapter_name": chapter_name,
"status": "running",
"method": "B",
"current": idx + 1,
"total": total,
})
b_result = await run_direct_llm_check(
chunks=chunks,
standard_items=standard_items,
chapter_code=chapter_code,
chapter_name=chapter_name,
)
chapter_result["method_b"] = direct_result_to_dict(b_result)
# ── 对比 ──
if mode == "compare" and "method_a" in chapter_result and "method_b" in chapter_result:
cr = compare_results(
chapter_code=chapter_code,
chapter_name=chapter_name,
method_a=chapter_result["method_a"]["result"],
method_b=b_result,
a_time=chapter_result["method_a"]["time"],
a_llm_calls=chapter_result["method_a"]["llm_calls"],
)
chapter_result["comparison"] = {
"a_missing": cr.a_missing,
"b_missing": cr.b_missing,
"a_rate": cr.a_completeness_rate,
"b_rate": cr.b_completeness_rate,
"a_time": cr.a_execution_time,
"b_time": cr.b_execution_time,
"agreement": cr.agreement_count,
"disagreement": cr.disagreement_count,
"a_only_missing": cr.a_only_missing,
"b_only_missing": cr.b_only_missing,
"a_missing_details": cr.a_missing_details,
"b_items": cr.b_items,
"a_recommendations": cr.a_recommendations,
}
all_results.append(chapter_result)
yield _format_sse("chapter_result", chapter_result)
# ── 汇总 ──
total_time = time.time() - start_all
summary = _build_summary(all_results, mode, total_time)
yield _format_sse("summary", summary)
except Exception as e:
yield _format_sse("error", {"message": str(e)})
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
},
)
def _build_summary(
results: List[Dict], mode: str, total_time: float
) -> Dict[str, Any]:
"""构建汇总统计"""
summary: Dict[str, Any] = {
"mode": mode,
"total_chapters": len(results),
"total_time": round(total_time, 2),
}
if mode in ("method_a", "compare"):
a_times = [r["method_a"]["time"] for r in results if "method_a" in r]
a_missing = []
for r in results:
if "method_a" in r:
tertiary = r["method_a"]["result"].get("tertiary_completeness", {})
a_missing.append(tertiary.get("missing", 0))
summary["method_a"] = {
"total_time": round(sum(a_times), 2),
"avg_time": round(sum(a_times) / len(a_times), 2) if a_times else 0,
"total_missing": sum(a_missing),
"avg_missing": round(sum(a_missing) / len(a_missing), 1) if a_missing else 0,
}
if mode in ("method_b", "compare"):
b_times = [r["method_b"]["execution_time"] for r in results if "method_b" in r]
b_missing = [r["method_b"]["missing_count"] for r in results if "method_b" in r]
summary["method_b"] = {
"total_time": round(sum(b_times), 2),
"avg_time": round(sum(b_times) / len(b_times), 2) if b_times else 0,
"total_missing": sum(b_missing),
"avg_missing": round(sum(b_missing) / len(b_missing), 1) if b_missing else 0,
}
if mode == "compare":
agreements = [
r["comparison"]["agreement"] for r in results if "comparison" in r
]
disagreements = [
r["comparison"]["disagreement"] for r in results if "comparison" in r
]
total_agree = sum(agreements)
total_disagree = sum(disagreements)
summary["comparison"] = {
"total_agreement": total_agree,
"total_disagreement": total_disagree,
"agreement_rate": round(
total_agree / (total_agree + total_disagree) * 100, 1
)
if (total_agree + total_disagree) > 0
else 0,
}
return summary
# ═══════════════════════════════════════════════════════════════════
# 导出端点
# ═══════════════════════════════════════════════════════════════════
@app.post("/api/compare/export")
async def export_results(request: Request):
"""接收前端结果数据,生成HTML报告并返回ZIP"""
body = await request.json()
file_name = body.get("file_name", "unknown")
mode = body.get("mode", "compare")
chapters = body.get("chapters", [])
summary = body.get("summary", {})
html = _gen_report_html(chapters, summary, file_name, mode)
safe = Path(file_name).stem or "report"
zip_name = f"{safe}_对比报告.zip"
return _make_zip_response(html, zip_name)
# ═══════════════════════════════════════════════════════════════════
# 批量测试端点
# ═══════════════════════════════════════════════════════════════════
@app.post("/api/compare/batch/run")
async def run_batch_test(request: Request):
"""批量测试5个文件(SSE流式返回),通过并发数参数控制并行度"""
body = {}
try:
raw = await request.body()
if raw:
body = json.loads(raw)
except Exception:
pass
concurrency = body.get("concurrency", 2)
concurrency = max(1, min(concurrency, 5)) # 限制1-5
async def batch_event_stream():
files = _pick_5_distinct_files()
if not files:
yield _format_sse("error", {"message": "无可用测试文件"})
return
file_infos = []
for f in files:
try:
d = load_final_result(str(f))
file_infos.append({
"file_id": f.stem,
"file_name": d.get("file_name", f.name),
})
except Exception:
file_infos.append({"file_id": f.stem, "file_name": f.name})
yield _format_sse("batch_started", {
"total_files": len(files),
"concurrency": concurrency,
"files": file_infos,
})
start_all = time.time()
queue: asyncio.Queue = asyncio.Queue()
sem = asyncio.Semaphore(concurrency)
collected_results: List[Dict] = []
async def process_one_file(idx: int, fpath: Path, fid: str, fname: str):
async with sem:
await queue.put(("batch_file_started", {
"file_id": fid, "file_name": fname, "file_index": idx,
}))
try:
data = load_final_result(str(fpath))
except Exception as e:
await queue.put(("batch_file_error", {"file_id": fid, "error": str(e)}))
await queue.put(("batch_file_done", {"file_id": fid, "result": None}))
return
chapter_codes = get_all_chapter_codes(data)
file_result = {
"file_id": fid, "file_name": fname,
"chapters": [], "code_name_map": {},
}
t_a = t_b = t_agree = t_disagree = t_am = t_bm = t_req = 0
for ci, chapter_code in enumerate(chapter_codes):
chunks = extract_chunks_by_chapter(data, chapter_code)
if not chunks:
continue
chapter_name = chunks[0].get("first_name", chapter_code)
standard_items = load_standard_items_for_chapter(str(CSV_PATH), chapter_code)
for si in standard_items:
file_result["code_name_map"][si["third_code"]] = si["third_name"]
if not standard_items:
continue
await queue.put(("batch_chapter_progress", {
"file_id": fid, "chapter_code": chapter_code,
"chapter_name": chapter_name, "current": ci + 1,
"total": len(chapter_codes),
}))
a_result, a_time, alc = await run_method_a(
chunks=chunks, csv_path=str(CSV_PATH), chapter_code=chapter_code)
b_result = await run_direct_llm_check(
chunks=chunks, standard_items=standard_items,
chapter_code=chapter_code, chapter_name=chapter_name)
cr = compare_results(
chapter_code=chapter_code, chapter_name=chapter_name,
method_a=a_result, method_b=b_result, a_time=a_time, a_llm_calls=alc)
file_result["chapters"].append({
"chapter_code": chapter_code,
"chapter_name": chapter_name,
"a_total": cr.a_total_required,
"a_missing": cr.a_missing,
"a_rate": cr.a_completeness_rate,
"a_time": round(a_time, 2),
"b_total": cr.b_total_required,
"b_missing": cr.b_missing,
"b_rate": cr.b_completeness_rate,
"b_time": round(b_result.execution_time, 2),
"agreement": cr.agreement_count,
"disagreement": cr.disagreement_count,
"a_only_missing": cr.a_only_missing,
"b_only_missing": cr.b_only_missing,
"a_recommendations": [
{
"level": r.get("level", ""),
"issue_point": r.get("issue_point", ""),
"location": r.get("location", ""),
"suggestion": r.get("suggestion", ""),
"reason": r.get("reason", ""),
}
for r in cr.a_recommendations
],
"b_items": [
{
"standard_code": item.get("standard_code", ""),
"standard_name": item.get("standard_name", ""),
"is_covered": item.get("is_covered", False),
"evidence": item.get("evidence", ""),
"reason": item.get("reason", ""),
"confidence": item.get("confidence", 0),
}
for item in cr.b_items
],
})
t_a += a_time; t_b += b_result.execution_time
t_agree += cr.agreement_count; t_disagree += cr.disagreement_count
t_am += cr.a_missing; t_bm += cr.b_missing; t_req += cr.a_total_required
n = len(file_result["chapters"])
file_result["summary"] = {
"chapter_count": n, "total_required": t_req,
"total_a_missing": t_am, "total_b_missing": t_bm,
"total_a_time": round(t_a, 2), "total_b_time": round(t_b, 2),
"total_agreement": t_agree, "total_disagreement": t_disagree,
"agreement_rate": (
round(t_agree / (t_agree + t_disagree) * 100, 1)
if (t_agree + t_disagree) > 0 else 0),
}
await queue.put(("batch_file_done", {"file_id": fid, "result": file_result}))
# 启动并发任务
tasks = [
asyncio.create_task(process_one_file(i, fpath, f["file_id"], f["file_name"]))
for i, (fpath, f) in enumerate(zip(files, file_infos))
]
# 从队列读取并 yield SSE,直到所有文件完成
done = 0
total = len(tasks)
while done < total:
event_type, data = await queue.get()
if event_type == "batch_file_done":
done += 1
if data.get("result"):
collected_results.append(data["result"])
yield _format_sse("batch_file_result", data["result"])
else:
yield _format_sse(event_type, data)
await asyncio.gather(*tasks, return_exceptions=True)
# 汇总
total_time = time.time() - start_all
all_chapters = sum(f["summary"]["chapter_count"] for f in collected_results)
collected_results.sort(key=lambda r: file_infos.index(
next(f for f in file_infos if f["file_id"] == r["file_id"])))
batch_summary = {
"total_files": len(collected_results),
"total_chapters": all_chapters,
"total_time": round(total_time, 2),
"files": collected_results,
}
yield _format_sse("batch_summary", batch_summary)
return StreamingResponse(
batch_event_stream(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Access-Control-Allow-Origin": "*",
},
)
@app.post("/api/compare/batch/export")
async def export_batch_results(request: Request):
"""接收批量结果数据,生成详细HTML报告并返回ZIP"""
body = await request.json()
files_data = body.get("files", [])
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
css = _report_css() + """
.chapter-block{border:1px solid #e0e0e0;border-radius:8px;padding:14px;margin-bottom:14px;break-inside:avoid}
.chapter-block h3{font-size:14px;color:#667eea;margin-bottom:10px;padding-bottom:6px;border-bottom:1px solid #f0f0f0}
.method-section{margin-bottom:10px}
.method-section h4{font-size:12px;font-weight:600;margin-bottom:6px}
.cover-item{padding:6px 8px;margin:3px 0;border-radius:4px;font-size:12px;border-left:3px solid #22c55e;background:#f0fdf4}
.miss-item{padding:6px 8px;margin:3px 0;border-radius:4px;font-size:12px;border-left:3px solid #ef4444;background:#fef2f2}
.diff-block{margin-top:8px;padding:8px;border-radius:6px;background:#fffbe6;border:1px solid #fde68a;font-size:12px}
.chapter-summary-line{font-size:11px;color:#888;margin-bottom:8px}
.toc{background:#f8f9fa;border-radius:8px;padding:12px;margin-bottom:16px;font-size:13px}
.toc a{color:#667eea;text-decoration:none;margin:0 8px}
.section-divider{border:0;border-top:2px dashed #e0e0e0;margin:20px 0}
"""
# 汇总统计
total_files = len(files_data)
total_chapters = sum(f["summary"]["chapter_count"] for f in files_data)
total_a_m = sum(f["summary"]["total_a_missing"] for f in files_data)
total_b_m = sum(f["summary"]["total_b_missing"] for f in files_data)
total_a_t = sum(f["summary"]["total_a_time"] for f in files_data)
total_b_t = sum(f["summary"]["total_b_time"] for f in files_data)
avg_rate = (
round(sum(f["summary"]["agreement_rate"] for f in files_data) / total_files, 1)
if total_files > 0 else 0
)
# ── AI 对比结论 ──
conclusion = _gen_conclusion(files_data, avg_rate, total_a_m, total_b_m, total_a_t, total_b_t)
stats = (
f''
f''
f''
f''
f''
f''
f''
)
# 目录
toc = ''
# 各文件详情
file_details = ""
for fi, f in enumerate(files_data):
chapters = f.get("chapters", [])
fname = f.get("file_name", f.get("file_id", ""))
s = f.get("summary", {})
nm = f.get("code_name_map", {})
file_details += f'
'
file_details += f'文件{fi+1}: {fname}
'
file_details += f'
'
file_details += f'{s.get("chapter_count",0)}章节 | '
file_details += f'总要求{s.get("total_required",0)}项 | '
file_details += f'A缺失{s.get("total_a_missing",0)} | B缺失{s.get("total_b_missing",0)} | '
file_details += f'一致率{s.get("agreement_rate",0)}% | '
file_details += f'A耗时{s.get("total_a_time",0)}s | B耗时{s.get("total_b_time",0)}s'
file_details += f'
'
# 每个章节的详细审查结果
for c in chapters:
code = c.get("chapter_code", "")
name = c.get("chapter_name", "")
file_details += f'
'
file_details += f'
{code} — {name}
'
file_details += f'
'
file_details += f'A: {c["a_missing"]}/{c["a_total"]}缺失 ({c["a_rate"]:.0f}%) | '
file_details += f'B: {c["b_missing"]}/{c["b_total"]}缺失 ({c["b_rate"]:.0f}%) | '
file_details += f'一致{c["agreement"]} | 分歧{c["disagreement"]}'
file_details += f'
'
# ── 差异项(优先展示) ──
a_only = c.get("a_only_missing", [])
b_only = c.get("b_only_missing", [])
if a_only or b_only:
file_details += '
差异项:'
if a_only:
parts = [f"{nm.get(x,x)}" for x in a_only]
file_details += f' 仅A缺失: {", ".join(parts)};'
if b_only:
parts = [f"{nm.get(x,x)}" for x in b_only]
file_details += f' 仅B缺失: {", ".join(parts)}'
file_details += '
'
# ── 方案A详情 ──
a_recs = c.get("a_recommendations", [])
if a_recs:
file_details += '
方案A — 审查结果
'
for rec in a_recs:
level = rec.get("level", "")
if level == "通过":
file_details += f'
{rec.get("issue_point","")}
'
else:
file_details += f'
[{level}] {rec.get("issue_point","")}'
if rec.get("location"):
file_details += f' — {rec["location"]}'
if rec.get("reason"):
file_details += f'
依据: {rec["reason"]}'
if rec.get("suggestion"):
file_details += f'
建议: {rec["suggestion"]}'
file_details += '
'
file_details += '
'
# ── 方案B详情 ──
b_items = c.get("b_items", [])
if b_items:
covered = [i for i in b_items if i.get("is_covered")]
missing = [i for i in b_items if not i.get("is_covered")]
file_details += '
方案B — 逐项判断
'
if missing:
file_details += f'
缺失 {len(missing)} 项:
'
for item in missing:
cn = item.get("standard_name", item.get("standard_code", ""))
file_details += f'
缺失 - {cn}'
file_details += f'
原因: {item.get("reason","-")}'
file_details += f' 置信度: {int((item.get("confidence",0) or 0)*100)}%'
file_details += '
'
if covered:
file_details += f'
已覆盖 {len(covered)} 项:
'
for item in covered:
cn = item.get("standard_name", item.get("standard_code", ""))
ev = (item.get("evidence", "") or "")[:200]
file_details += f'
覆盖 - {cn}'
if ev and ev != "无":
file_details += f'
证据: {ev}'
file_details += f' 置信度: {int((item.get("confidence",0) or 0)*100)}%'
file_details += '
'
file_details += '
'
file_details += '
' # chapter-block
file_details += '
' # panel
html = f"""
批量对比报告
{conclusion}
{toc}
{file_details}
"""
zip_name = f"批量对比报告_{datetime.now().strftime('%Y%m%d_%H%M%S')}.zip"
return _make_zip_response(html, zip_name)