rag_evaluator.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499
  1. """
  2. RAG 管线评估器 — 用 LLM + 规则对管线输出进行 5 维度自动评估
  3. 评估维度:
  4. 1. 召回相关性 (0-5): 检索到的规范条文是否与审查要点语义对齐
  5. 2. 参数匹配度 (0-5): 参数值(mm、MPa、℃)是否在规范条文中匹配
  6. 3. 幻觉风险 (0-5, 5=最安全): search_queries 是否捏造了原文不存在的概念
  7. 4. 覆盖度 (0-5): 原文关键技术参数是否被充分提取
  8. 5. 分数合理性: rerank_score 分布是否符合预期
  9. """
  10. import re
  11. import json
  12. import uuid
  13. from dataclasses import dataclass
  14. from typing import Optional
  15. from foundation.observability.logger.loggering import review_logger as logger
  16. # ========================= 评估结果结构 =========================
  17. @dataclass
  18. class DimensionScore:
  19. """单维度评分"""
  20. name: str
  21. score: float # 0-5
  22. max_score: float = 5.0
  23. details: str = ""
  24. status: str = "PASS" # PASS / WARN / FAIL
  25. @property
  26. def percentage(self) -> float:
  27. return self.score / self.max_score * 100
  28. @dataclass
  29. class SampleEvaluation:
  30. """单个测试样本的完整评估"""
  31. chunk_id: str
  32. dimensions: list = None # list[DimensionScore]
  33. overall_score: float = 0.0
  34. overall_status: str = "PASS"
  35. analysis: str = ""
  36. def __post_init__(self):
  37. if self.dimensions is None:
  38. self.dimensions = []
  39. def compute_overall(self):
  40. if self.dimensions:
  41. self.overall_score = sum(d.score for d in self.dimensions) / len(self.dimensions)
  42. if self.overall_score >= 3.5:
  43. self.overall_status = "PASS"
  44. elif self.overall_score >= 2.0:
  45. self.overall_status = "WARN"
  46. else:
  47. self.overall_status = "FAIL"
  48. # ========================= 评估器 =========================
  49. class RAGEvaluator:
  50. """RAG 管线自动评估器"""
  51. def __init__(self):
  52. from foundation.ai.agent.generate.model_generate import generate_model_client
  53. self.generate_model_client = generate_model_client
  54. def _llm_evaluate(self, prompt: str, task_name: str = "rag_evaluation") -> str:
  55. """调用 LLM 进行评估"""
  56. try:
  57. task_prompt_info = {
  58. "task_prompt": prompt,
  59. "task_name": task_name
  60. }
  61. trace_id = str(uuid.uuid4())
  62. response = self.generate_model_client.get_model_generate_invoke_sync(
  63. trace_id=trace_id,
  64. task_prompt_info=task_prompt_info,
  65. timeout=60,
  66. function_name="review_point_extract" # 复用现有模型配置
  67. )
  68. if isinstance(response, str):
  69. return response.strip()
  70. return str(response)
  71. except Exception as e:
  72. logger.error(f"[RAG评估] LLM 调用失败: {e}")
  73. return ""
  74. def _extract_parameter_tokens(self, text: str) -> set:
  75. """从文本中提取参数 token(数值+单位)"""
  76. patterns = [
  77. r'\d+\.?\d*\s*mm',
  78. r'\d+\.?\d*\s*MPa',
  79. r'\d+\.?\d*\s*℃',
  80. r'\d+\.?\d*\s*%',
  81. r'\d+\.?\d*\s*m³',
  82. r'\d+\.?\d*\s*dB',
  83. r'≤\s*\d+\.?\d*',
  84. r'≥\s*\d+\.?\d*',
  85. r'±\s*\d+\.?\d*',
  86. r'H/\d+',
  87. r'L/\d+',
  88. r'F\.S',
  89. r'\d+\.?\d*\s*μm',
  90. r'\d+\.?\d*\s*με',
  91. ]
  92. tokens = set()
  93. for pattern in patterns:
  94. matches = re.findall(pattern, text, re.IGNORECASE)
  95. tokens.update(m.strip() for m in matches)
  96. return tokens
  97. # ========================= 维度 1: 召回相关性 =========================
  98. def evaluate_recall(self, review_point: dict, results: list) -> DimensionScore:
  99. """评估召回相关性"""
  100. label = review_point.get('label', review_point.get('entity', ''))
  101. queries = review_point.get('search_queries', review_point.get('search_keywords', []))
  102. if not results:
  103. return DimensionScore(
  104. name="召回相关性", score=0, details="无检索结果", status="FAIL"
  105. )
  106. # 规则评估: 检查 top 结果的 rerank_score
  107. top_scores = [r.get('rerank_score', 0) or r.get('bfp_rerank_score', 0) for r in results[:3]]
  108. avg_top = sum(top_scores) / len(top_scores) if top_scores else 0
  109. # 文本相关性: 检查检索文本中是否包含与 label 相关的关键词
  110. label_keywords = set(label) # 中文字符级别
  111. relevance_hits = 0
  112. for r in results[:3]:
  113. text = r.get('text_content', '')
  114. # 检查 label 中的字在检索文本中的覆盖率
  115. hit_count = sum(1 for ch in label_keywords if ch in text and ch.strip())
  116. if len(label_keywords) > 0:
  117. coverage = hit_count / max(len(label_keywords), 1)
  118. if coverage > 0.3:
  119. relevance_hits += 1
  120. # 综合评分
  121. score = 0.0
  122. if avg_top > 0.6:
  123. score += 2.5
  124. elif avg_top > 0.4:
  125. score += 1.5
  126. elif avg_top > 0.2:
  127. score += 0.5
  128. score += min(relevance_hits, 3) * 0.8 # 最多 2.4
  129. score = min(score, 5.0)
  130. status = "PASS" if score >= 3.0 else ("WARN" if score >= 1.5 else "FAIL")
  131. return DimensionScore(
  132. name="召回相关性",
  133. score=round(score, 1),
  134. details=f"avg_top_score={avg_top:.3f}, relevance_hits={relevance_hits}/3, top_scores={[f'{s:.3f}' for s in top_scores]}",
  135. status=status
  136. )
  137. # ========================= 维度 2: 参数匹配度 =========================
  138. def evaluate_parameter_match(self, review_point: dict, results: list) -> DimensionScore:
  139. """评估参数匹配度"""
  140. original_text = review_point.get('original_text', review_point.get('background', ''))
  141. parameter = review_point.get('parameter', '')
  142. if not results:
  143. return DimensionScore(
  144. name="参数匹配度", score=0, details="无检索结果", status="FAIL"
  145. )
  146. # 提取原文中的参数 token
  147. source_text = original_text + " " + parameter
  148. source_params = self._extract_parameter_tokens(source_text)
  149. if not source_params:
  150. return DimensionScore(
  151. name="参数匹配度", score=3.0,
  152. details="原文无可提取参数,中性评分", status="PASS"
  153. )
  154. # 检查检索结果中是否包含相同或相近的参数
  155. retrieved_text = " ".join(r.get('text_content', '') for r in results[:5])
  156. retrieved_params = self._extract_parameter_tokens(retrieved_text)
  157. # 精确匹配
  158. exact_matches = source_params & retrieved_params
  159. match_ratio = len(exact_matches) / len(source_params) if source_params else 0
  160. score = min(match_ratio * 5, 5.0)
  161. status = "PASS" if score >= 3.0 else ("WARN" if score >= 1.5 else "FAIL")
  162. return DimensionScore(
  163. name="参数匹配度",
  164. score=round(score, 1),
  165. details=f"原文参数: {source_params}, 匹配: {exact_matches}, 匹配率: {match_ratio:.0%}",
  166. status=status
  167. )
  168. # ========================= 维度 3: 幻觉风险 =========================
  169. def evaluate_hallucination(self, review_point: dict, content: str) -> DimensionScore:
  170. """评估 search_queries 是否忠实于原文(5=最安全)"""
  171. queries = review_point.get('search_queries', review_point.get('search_keywords', []))
  172. original_text = review_point.get('original_text', review_point.get('background', ''))
  173. if not queries:
  174. return DimensionScore(
  175. name="幻觉风险", score=0, details="无 search_queries", status="FAIL"
  176. )
  177. # 检查 original_text 是否确实在原文中
  178. original_in_content = original_text in content if original_text else False
  179. # 检查 search_queries 中的关键词是否源自原文
  180. query_text = " ".join(queries)
  181. query_keywords = set(re.findall(r'[一-鿿]{2,}', query_text))
  182. content_keywords = set(re.findall(r'[一-鿿]{2,}', content))
  183. # 计算 query 关键词在原文中的覆盖率
  184. if query_keywords:
  185. coverage = len(query_keywords & content_keywords) / len(query_keywords)
  186. else:
  187. coverage = 0.5
  188. # original_text 在原文中的加分
  189. original_bonus = 1.5 if original_in_content else 0
  190. score = min(coverage * 3.5 + original_bonus, 5.0)
  191. status = "PASS" if score >= 3.5 else ("WARN" if score >= 2.0 else "FAIL")
  192. return DimensionScore(
  193. name="幻觉风险",
  194. score=round(score, 1),
  195. details=(
  196. f"original_in_content={original_in_content}, "
  197. f"query_keyword_coverage={coverage:.0%}, "
  198. f"query_keywords_sample={list(query_keywords)[:5]}"
  199. ),
  200. status=status
  201. )
  202. # ========================= 维度 4: 覆盖度 =========================
  203. def evaluate_coverage(self, review_points: list, content: str) -> DimensionScore:
  204. """评估原文关键参数是否被充分提取为审查要点"""
  205. # 提取原文中的参数
  206. content_params = self._extract_parameter_tokens(content)
  207. if not content_params:
  208. return DimensionScore(
  209. name="覆盖度", score=3.0,
  210. details="原文无可提取参数,中性评分", status="PASS"
  211. )
  212. # 提取所有 review_points 中涉及的参数
  213. rp_text = ""
  214. for rp in review_points:
  215. rp_text += " " + rp.get('original_text', rp.get('background', ''))
  216. rp_text += " " + rp.get('parameter', '')
  217. rp_text += " " + rp.get('label', rp.get('entity', ''))
  218. rp_params = self._extract_parameter_tokens(rp_text)
  219. # 计算覆盖率
  220. covered = content_params & rp_params
  221. coverage = len(covered) / len(content_params) if content_params else 0
  222. # 审查要点数量也影响评分(太少 = 覆盖不足)
  223. count_factor = min(len(review_points) / 3, 1.0) # 3个以上满分
  224. score = min(coverage * 3 + count_factor * 2, 5.0)
  225. status = "PASS" if score >= 3.0 else ("WARN" if score >= 1.5 else "FAIL")
  226. return DimensionScore(
  227. name="覆盖度",
  228. score=round(score, 1),
  229. details=(
  230. f"原文参数数: {len(content_params)}, "
  231. f"被覆盖: {len(covered)}, "
  232. f"覆盖率: {coverage:.0%}, "
  233. f"审查要点数: {len(review_points)}"
  234. ),
  235. status=status
  236. )
  237. # ========================= 维度 5: 分数合理性 =========================
  238. def evaluate_score_distribution(self, retrieval_results: list) -> DimensionScore:
  239. """评估 rerank_score 分布是否合理"""
  240. all_scores = []
  241. for results in retrieval_results:
  242. if results:
  243. for r in results:
  244. score = r.get('rerank_score', 0) or 0
  245. if score > 0:
  246. all_scores.append(score)
  247. if not all_scores:
  248. return DimensionScore(
  249. name="分数合理性", score=1.0,
  250. details="无有效 rerank_score", status="WARN"
  251. )
  252. avg_score = sum(all_scores) / len(all_scores)
  253. min_score = min(all_scores)
  254. max_score = max(all_scores)
  255. std_dev = (sum((s - avg_score) ** 2 for s in all_scores) / len(all_scores)) ** 0.5
  256. # 合理性判断
  257. issues = []
  258. if avg_score < 0.2:
  259. issues.append("平均分过低(<0.2)")
  260. if avg_score > 0.9:
  261. issues.append("平均分过高(>0.9),可能缺乏区分度")
  262. if std_dev < 0.05:
  263. issues.append("标准差过小(<0.05),分数过于集中")
  264. if max_score - min_score < 0.1:
  265. issues.append("分数范围过窄")
  266. # 检查是否有极端低分
  267. low_count = sum(1 for s in all_scores if s < 0.1)
  268. if low_count > len(all_scores) * 0.5:
  269. issues.append(f"超过50%的结果分数<0.1 ({low_count}/{len(all_scores)})")
  270. if not issues:
  271. score = 5.0
  272. elif len(issues) == 1:
  273. score = 3.5
  274. elif len(issues) == 2:
  275. score = 2.0
  276. else:
  277. score = 1.0
  278. status = "PASS" if score >= 3.0 else ("WARN" if score >= 2.0 else "FAIL")
  279. return DimensionScore(
  280. name="分数合理性",
  281. score=score,
  282. details=(
  283. f"avg={avg_score:.3f}, min={min_score:.3f}, max={max_score:.3f}, "
  284. f"std={std_dev:.3f}, n={len(all_scores)}, issues={issues}"
  285. ),
  286. status=status
  287. )
  288. # ========================= 综合评估 =========================
  289. def evaluate_sample(self, pipeline_result, content: str) -> SampleEvaluation:
  290. """
  291. 对单个测试样本进行完整 5 维度评估
  292. Args:
  293. pipeline_result: PipelineResult 对象
  294. content: 原始文本
  295. Returns:
  296. SampleEvaluation
  297. """
  298. eval_result = SampleEvaluation(chunk_id=pipeline_result.chunk_id)
  299. if pipeline_result.extract_error or not pipeline_result.review_points:
  300. eval_result.dimensions = [
  301. DimensionScore("召回相关性", 0, details="提取失败", status="FAIL"),
  302. DimensionScore("参数匹配度", 0, details="提取失败", status="FAIL"),
  303. DimensionScore("幻觉风险", 0, details="提取失败", status="FAIL"),
  304. DimensionScore("覆盖度", 0, details="提取失败", status="FAIL"),
  305. DimensionScore("分数合理性", 0, details="提取失败", status="FAIL"),
  306. ]
  307. eval_result.analysis = f"审查要点提取失败: {pipeline_result.extract_error}"
  308. eval_result.compute_overall()
  309. return eval_result
  310. # 维度 1+2: 对每个审查要点评估召回相关性和参数匹配度,取平均
  311. recall_scores = []
  312. param_scores = []
  313. hallucination_scores = []
  314. for i, rp in enumerate(pipeline_result.review_points):
  315. results = (
  316. pipeline_result.retrieval_results[i]
  317. if pipeline_result.retrieval_results and i < len(pipeline_result.retrieval_results)
  318. else []
  319. )
  320. recall_scores.append(self.evaluate_recall(rp, results))
  321. param_scores.append(self.evaluate_parameter_match(rp, results))
  322. hallucination_scores.append(self.evaluate_hallucination(rp, content))
  323. # 取各维度的平均分
  324. avg_recall = sum(d.score for d in recall_scores) / len(recall_scores) if recall_scores else 0
  325. avg_param = sum(d.score for d in param_scores) / len(param_scores) if param_scores else 0
  326. avg_halluc = sum(d.score for d in hallucination_scores) / len(hallucination_scores) if hallucination_scores else 0
  327. # 维度 4: 覆盖度
  328. coverage = self.evaluate_coverage(pipeline_result.review_points, content)
  329. # 维度 5: 分数合理性
  330. score_dist = self.evaluate_score_distribution(pipeline_result.retrieval_results)
  331. eval_result.dimensions = [
  332. DimensionScore("召回相关性", round(avg_recall, 1),
  333. details=f"各要点: {[f'{d.score}' for d in recall_scores]}"),
  334. DimensionScore("参数匹配度", round(avg_param, 1),
  335. details=f"各要点: {[f'{d.score}' for d in param_scores]}"),
  336. DimensionScore("幻觉风险", round(avg_halluc, 1),
  337. details=f"各要点: {[f'{d.score}' for d in hallucination_scores]}"),
  338. coverage,
  339. score_dist,
  340. ]
  341. eval_result.compute_overall()
  342. # 生成分析文本
  343. pass_count = sum(1 for d in eval_result.dimensions if d.status == "PASS")
  344. warn_count = sum(1 for d in eval_result.dimensions if d.status == "WARN")
  345. fail_count = sum(1 for d in eval_result.dimensions if d.status == "FAIL")
  346. eval_result.analysis = (
  347. f"5维度评估: {pass_count} PASS / {warn_count} WARN / {fail_count} FAIL. "
  348. f"审查要点数={pipeline_result.review_point_count}, "
  349. f"检索结果数={pipeline_result.total_retrieved}"
  350. )
  351. return eval_result
  352. # ========================= 报告生成 =========================
  353. def generate_report(self, evaluations: list) -> str:
  354. """
  355. 生成 Markdown 格式的测试报告
  356. Args:
  357. evaluations: list[SampleEvaluation]
  358. Returns:
  359. str: Markdown 报告
  360. """
  361. lines = [
  362. "# RAG 管线测试报告",
  363. "",
  364. f"测试样本数: {len(evaluations)}",
  365. "",
  366. ]
  367. # 汇总表
  368. lines.append("## 汇总表")
  369. lines.append("")
  370. lines.append("| 样本 | 总分 | 状态 | 召回相关性 | 参数匹配度 | 幻觉风险 | 覆盖度 | 分数合理性 |")
  371. lines.append("|------|------|------|-----------|-----------|---------|--------|-----------|")
  372. for ev in evaluations:
  373. dims = {d.name: f"{d.score:.1f}" for d in ev.dimensions}
  374. lines.append(
  375. f"| {ev.chunk_id} | {ev.overall_score:.1f} | {ev.overall_status} | "
  376. f"{dims.get('召回相关性', '-')} | {dims.get('参数匹配度', '-')} | "
  377. f"{dims.get('幻觉风险', '-')} | {dims.get('覆盖度', '-')} | "
  378. f"{dims.get('分数合理性', '-')} |"
  379. )
  380. lines.append("")
  381. # 详细结果
  382. lines.append("## 详细评估")
  383. lines.append("")
  384. for ev in evaluations:
  385. status_emoji = {"PASS": "✅", "WARN": "⚠️", "FAIL": "❌"}.get(ev.overall_status, "?")
  386. lines.append(f"### {status_emoji} {ev.chunk_id} (总分: {ev.overall_score:.1f})")
  387. lines.append("")
  388. lines.append(f"**分析**: {ev.analysis}")
  389. lines.append("")
  390. for dim in ev.dimensions:
  391. dim_emoji = {"PASS": "✅", "WARN": "⚠️", "FAIL": "❌"}.get(dim.status, "?")
  392. lines.append(f"- {dim_emoji} **{dim.name}**: {dim.score:.1f}/5.0")
  393. lines.append(f" - {dim.details}")
  394. lines.append("")
  395. # 全局统计
  396. total_pass = sum(1 for ev in evaluations if ev.overall_status == "PASS")
  397. total_warn = sum(1 for ev in evaluations if ev.overall_status == "WARN")
  398. total_fail = sum(1 for ev in evaluations if ev.overall_status == "FAIL")
  399. lines.append("## 全局统计")
  400. lines.append("")
  401. lines.append(f"- PASS: {total_pass}/{len(evaluations)}")
  402. lines.append(f"- WARN: {total_warn}/{len(evaluations)}")
  403. lines.append(f"- FAIL: {total_fail}/{len(evaluations)}")
  404. if evaluations:
  405. avg_overall = sum(ev.overall_score for ev in evaluations) / len(evaluations)
  406. lines.append(f"- 平均总分: {avg_overall:.1f}/5.0")
  407. return "\n".join(lines)