| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499 |
- """
- RAG 管线评估器 — 用 LLM + 规则对管线输出进行 5 维度自动评估
- 评估维度:
- 1. 召回相关性 (0-5): 检索到的规范条文是否与审查要点语义对齐
- 2. 参数匹配度 (0-5): 参数值(mm、MPa、℃)是否在规范条文中匹配
- 3. 幻觉风险 (0-5, 5=最安全): search_queries 是否捏造了原文不存在的概念
- 4. 覆盖度 (0-5): 原文关键技术参数是否被充分提取
- 5. 分数合理性: rerank_score 分布是否符合预期
- """
- import re
- import json
- import uuid
- from dataclasses import dataclass
- from typing import Optional
- from foundation.observability.logger.loggering import review_logger as logger
- # ========================= 评估结果结构 =========================
- @dataclass
- class DimensionScore:
- """单维度评分"""
- name: str
- score: float # 0-5
- max_score: float = 5.0
- details: str = ""
- status: str = "PASS" # PASS / WARN / FAIL
- @property
- def percentage(self) -> float:
- return self.score / self.max_score * 100
- @dataclass
- class SampleEvaluation:
- """单个测试样本的完整评估"""
- chunk_id: str
- dimensions: list = None # list[DimensionScore]
- overall_score: float = 0.0
- overall_status: str = "PASS"
- analysis: str = ""
- def __post_init__(self):
- if self.dimensions is None:
- self.dimensions = []
- def compute_overall(self):
- if self.dimensions:
- self.overall_score = sum(d.score for d in self.dimensions) / len(self.dimensions)
- if self.overall_score >= 3.5:
- self.overall_status = "PASS"
- elif self.overall_score >= 2.0:
- self.overall_status = "WARN"
- else:
- self.overall_status = "FAIL"
- # ========================= 评估器 =========================
- class RAGEvaluator:
- """RAG 管线自动评估器"""
- def __init__(self):
- from foundation.ai.agent.generate.model_generate import generate_model_client
- self.generate_model_client = generate_model_client
- def _llm_evaluate(self, prompt: str, task_name: str = "rag_evaluation") -> str:
- """调用 LLM 进行评估"""
- try:
- task_prompt_info = {
- "task_prompt": prompt,
- "task_name": task_name
- }
- trace_id = str(uuid.uuid4())
- response = self.generate_model_client.get_model_generate_invoke_sync(
- trace_id=trace_id,
- task_prompt_info=task_prompt_info,
- timeout=60,
- function_name="review_point_extract" # 复用现有模型配置
- )
- if isinstance(response, str):
- return response.strip()
- return str(response)
- except Exception as e:
- logger.error(f"[RAG评估] LLM 调用失败: {e}")
- return ""
- def _extract_parameter_tokens(self, text: str) -> set:
- """从文本中提取参数 token(数值+单位)"""
- patterns = [
- r'\d+\.?\d*\s*mm',
- r'\d+\.?\d*\s*MPa',
- r'\d+\.?\d*\s*℃',
- r'\d+\.?\d*\s*%',
- r'\d+\.?\d*\s*m³',
- r'\d+\.?\d*\s*dB',
- r'≤\s*\d+\.?\d*',
- r'≥\s*\d+\.?\d*',
- r'±\s*\d+\.?\d*',
- r'H/\d+',
- r'L/\d+',
- r'F\.S',
- r'\d+\.?\d*\s*μm',
- r'\d+\.?\d*\s*με',
- ]
- tokens = set()
- for pattern in patterns:
- matches = re.findall(pattern, text, re.IGNORECASE)
- tokens.update(m.strip() for m in matches)
- return tokens
- # ========================= 维度 1: 召回相关性 =========================
- def evaluate_recall(self, review_point: dict, results: list) -> DimensionScore:
- """评估召回相关性"""
- label = review_point.get('label', review_point.get('entity', ''))
- queries = review_point.get('search_queries', review_point.get('search_keywords', []))
- if not results:
- return DimensionScore(
- name="召回相关性", score=0, details="无检索结果", status="FAIL"
- )
- # 规则评估: 检查 top 结果的 rerank_score
- top_scores = [r.get('rerank_score', 0) or r.get('bfp_rerank_score', 0) for r in results[:3]]
- avg_top = sum(top_scores) / len(top_scores) if top_scores else 0
- # 文本相关性: 检查检索文本中是否包含与 label 相关的关键词
- label_keywords = set(label) # 中文字符级别
- relevance_hits = 0
- for r in results[:3]:
- text = r.get('text_content', '')
- # 检查 label 中的字在检索文本中的覆盖率
- hit_count = sum(1 for ch in label_keywords if ch in text and ch.strip())
- if len(label_keywords) > 0:
- coverage = hit_count / max(len(label_keywords), 1)
- if coverage > 0.3:
- relevance_hits += 1
- # 综合评分
- score = 0.0
- if avg_top > 0.6:
- score += 2.5
- elif avg_top > 0.4:
- score += 1.5
- elif avg_top > 0.2:
- score += 0.5
- score += min(relevance_hits, 3) * 0.8 # 最多 2.4
- score = min(score, 5.0)
- status = "PASS" if score >= 3.0 else ("WARN" if score >= 1.5 else "FAIL")
- return DimensionScore(
- name="召回相关性",
- score=round(score, 1),
- details=f"avg_top_score={avg_top:.3f}, relevance_hits={relevance_hits}/3, top_scores={[f'{s:.3f}' for s in top_scores]}",
- status=status
- )
- # ========================= 维度 2: 参数匹配度 =========================
- def evaluate_parameter_match(self, review_point: dict, results: list) -> DimensionScore:
- """评估参数匹配度"""
- original_text = review_point.get('original_text', review_point.get('background', ''))
- parameter = review_point.get('parameter', '')
- if not results:
- return DimensionScore(
- name="参数匹配度", score=0, details="无检索结果", status="FAIL"
- )
- # 提取原文中的参数 token
- source_text = original_text + " " + parameter
- source_params = self._extract_parameter_tokens(source_text)
- if not source_params:
- return DimensionScore(
- name="参数匹配度", score=3.0,
- details="原文无可提取参数,中性评分", status="PASS"
- )
- # 检查检索结果中是否包含相同或相近的参数
- retrieved_text = " ".join(r.get('text_content', '') for r in results[:5])
- retrieved_params = self._extract_parameter_tokens(retrieved_text)
- # 精确匹配
- exact_matches = source_params & retrieved_params
- match_ratio = len(exact_matches) / len(source_params) if source_params else 0
- score = min(match_ratio * 5, 5.0)
- status = "PASS" if score >= 3.0 else ("WARN" if score >= 1.5 else "FAIL")
- return DimensionScore(
- name="参数匹配度",
- score=round(score, 1),
- details=f"原文参数: {source_params}, 匹配: {exact_matches}, 匹配率: {match_ratio:.0%}",
- status=status
- )
- # ========================= 维度 3: 幻觉风险 =========================
- def evaluate_hallucination(self, review_point: dict, content: str) -> DimensionScore:
- """评估 search_queries 是否忠实于原文(5=最安全)"""
- queries = review_point.get('search_queries', review_point.get('search_keywords', []))
- original_text = review_point.get('original_text', review_point.get('background', ''))
- if not queries:
- return DimensionScore(
- name="幻觉风险", score=0, details="无 search_queries", status="FAIL"
- )
- # 检查 original_text 是否确实在原文中
- original_in_content = original_text in content if original_text else False
- # 检查 search_queries 中的关键词是否源自原文
- query_text = " ".join(queries)
- query_keywords = set(re.findall(r'[一-鿿]{2,}', query_text))
- content_keywords = set(re.findall(r'[一-鿿]{2,}', content))
- # 计算 query 关键词在原文中的覆盖率
- if query_keywords:
- coverage = len(query_keywords & content_keywords) / len(query_keywords)
- else:
- coverage = 0.5
- # original_text 在原文中的加分
- original_bonus = 1.5 if original_in_content else 0
- score = min(coverage * 3.5 + original_bonus, 5.0)
- status = "PASS" if score >= 3.5 else ("WARN" if score >= 2.0 else "FAIL")
- return DimensionScore(
- name="幻觉风险",
- score=round(score, 1),
- details=(
- f"original_in_content={original_in_content}, "
- f"query_keyword_coverage={coverage:.0%}, "
- f"query_keywords_sample={list(query_keywords)[:5]}"
- ),
- status=status
- )
- # ========================= 维度 4: 覆盖度 =========================
- def evaluate_coverage(self, review_points: list, content: str) -> DimensionScore:
- """评估原文关键参数是否被充分提取为审查要点"""
- # 提取原文中的参数
- content_params = self._extract_parameter_tokens(content)
- if not content_params:
- return DimensionScore(
- name="覆盖度", score=3.0,
- details="原文无可提取参数,中性评分", status="PASS"
- )
- # 提取所有 review_points 中涉及的参数
- rp_text = ""
- for rp in review_points:
- rp_text += " " + rp.get('original_text', rp.get('background', ''))
- rp_text += " " + rp.get('parameter', '')
- rp_text += " " + rp.get('label', rp.get('entity', ''))
- rp_params = self._extract_parameter_tokens(rp_text)
- # 计算覆盖率
- covered = content_params & rp_params
- coverage = len(covered) / len(content_params) if content_params else 0
- # 审查要点数量也影响评分(太少 = 覆盖不足)
- count_factor = min(len(review_points) / 3, 1.0) # 3个以上满分
- score = min(coverage * 3 + count_factor * 2, 5.0)
- status = "PASS" if score >= 3.0 else ("WARN" if score >= 1.5 else "FAIL")
- return DimensionScore(
- name="覆盖度",
- score=round(score, 1),
- details=(
- f"原文参数数: {len(content_params)}, "
- f"被覆盖: {len(covered)}, "
- f"覆盖率: {coverage:.0%}, "
- f"审查要点数: {len(review_points)}"
- ),
- status=status
- )
- # ========================= 维度 5: 分数合理性 =========================
- def evaluate_score_distribution(self, retrieval_results: list) -> DimensionScore:
- """评估 rerank_score 分布是否合理"""
- all_scores = []
- for results in retrieval_results:
- if results:
- for r in results:
- score = r.get('rerank_score', 0) or 0
- if score > 0:
- all_scores.append(score)
- if not all_scores:
- return DimensionScore(
- name="分数合理性", score=1.0,
- details="无有效 rerank_score", status="WARN"
- )
- avg_score = sum(all_scores) / len(all_scores)
- min_score = min(all_scores)
- max_score = max(all_scores)
- std_dev = (sum((s - avg_score) ** 2 for s in all_scores) / len(all_scores)) ** 0.5
- # 合理性判断
- issues = []
- if avg_score < 0.2:
- issues.append("平均分过低(<0.2)")
- if avg_score > 0.9:
- issues.append("平均分过高(>0.9),可能缺乏区分度")
- if std_dev < 0.05:
- issues.append("标准差过小(<0.05),分数过于集中")
- if max_score - min_score < 0.1:
- issues.append("分数范围过窄")
- # 检查是否有极端低分
- low_count = sum(1 for s in all_scores if s < 0.1)
- if low_count > len(all_scores) * 0.5:
- issues.append(f"超过50%的结果分数<0.1 ({low_count}/{len(all_scores)})")
- if not issues:
- score = 5.0
- elif len(issues) == 1:
- score = 3.5
- elif len(issues) == 2:
- score = 2.0
- else:
- score = 1.0
- status = "PASS" if score >= 3.0 else ("WARN" if score >= 2.0 else "FAIL")
- return DimensionScore(
- name="分数合理性",
- score=score,
- details=(
- f"avg={avg_score:.3f}, min={min_score:.3f}, max={max_score:.3f}, "
- f"std={std_dev:.3f}, n={len(all_scores)}, issues={issues}"
- ),
- status=status
- )
- # ========================= 综合评估 =========================
- def evaluate_sample(self, pipeline_result, content: str) -> SampleEvaluation:
- """
- 对单个测试样本进行完整 5 维度评估
- Args:
- pipeline_result: PipelineResult 对象
- content: 原始文本
- Returns:
- SampleEvaluation
- """
- eval_result = SampleEvaluation(chunk_id=pipeline_result.chunk_id)
- if pipeline_result.extract_error or not pipeline_result.review_points:
- eval_result.dimensions = [
- DimensionScore("召回相关性", 0, details="提取失败", status="FAIL"),
- DimensionScore("参数匹配度", 0, details="提取失败", status="FAIL"),
- DimensionScore("幻觉风险", 0, details="提取失败", status="FAIL"),
- DimensionScore("覆盖度", 0, details="提取失败", status="FAIL"),
- DimensionScore("分数合理性", 0, details="提取失败", status="FAIL"),
- ]
- eval_result.analysis = f"审查要点提取失败: {pipeline_result.extract_error}"
- eval_result.compute_overall()
- return eval_result
- # 维度 1+2: 对每个审查要点评估召回相关性和参数匹配度,取平均
- recall_scores = []
- param_scores = []
- hallucination_scores = []
- for i, rp in enumerate(pipeline_result.review_points):
- results = (
- pipeline_result.retrieval_results[i]
- if pipeline_result.retrieval_results and i < len(pipeline_result.retrieval_results)
- else []
- )
- recall_scores.append(self.evaluate_recall(rp, results))
- param_scores.append(self.evaluate_parameter_match(rp, results))
- hallucination_scores.append(self.evaluate_hallucination(rp, content))
- # 取各维度的平均分
- avg_recall = sum(d.score for d in recall_scores) / len(recall_scores) if recall_scores else 0
- avg_param = sum(d.score for d in param_scores) / len(param_scores) if param_scores else 0
- avg_halluc = sum(d.score for d in hallucination_scores) / len(hallucination_scores) if hallucination_scores else 0
- # 维度 4: 覆盖度
- coverage = self.evaluate_coverage(pipeline_result.review_points, content)
- # 维度 5: 分数合理性
- score_dist = self.evaluate_score_distribution(pipeline_result.retrieval_results)
- eval_result.dimensions = [
- DimensionScore("召回相关性", round(avg_recall, 1),
- details=f"各要点: {[f'{d.score}' for d in recall_scores]}"),
- DimensionScore("参数匹配度", round(avg_param, 1),
- details=f"各要点: {[f'{d.score}' for d in param_scores]}"),
- DimensionScore("幻觉风险", round(avg_halluc, 1),
- details=f"各要点: {[f'{d.score}' for d in hallucination_scores]}"),
- coverage,
- score_dist,
- ]
- eval_result.compute_overall()
- # 生成分析文本
- pass_count = sum(1 for d in eval_result.dimensions if d.status == "PASS")
- warn_count = sum(1 for d in eval_result.dimensions if d.status == "WARN")
- fail_count = sum(1 for d in eval_result.dimensions if d.status == "FAIL")
- eval_result.analysis = (
- f"5维度评估: {pass_count} PASS / {warn_count} WARN / {fail_count} FAIL. "
- f"审查要点数={pipeline_result.review_point_count}, "
- f"检索结果数={pipeline_result.total_retrieved}"
- )
- return eval_result
- # ========================= 报告生成 =========================
- def generate_report(self, evaluations: list) -> str:
- """
- 生成 Markdown 格式的测试报告
- Args:
- evaluations: list[SampleEvaluation]
- Returns:
- str: Markdown 报告
- """
- lines = [
- "# RAG 管线测试报告",
- "",
- f"测试样本数: {len(evaluations)}",
- "",
- ]
- # 汇总表
- lines.append("## 汇总表")
- lines.append("")
- lines.append("| 样本 | 总分 | 状态 | 召回相关性 | 参数匹配度 | 幻觉风险 | 覆盖度 | 分数合理性 |")
- lines.append("|------|------|------|-----------|-----------|---------|--------|-----------|")
- for ev in evaluations:
- dims = {d.name: f"{d.score:.1f}" for d in ev.dimensions}
- lines.append(
- f"| {ev.chunk_id} | {ev.overall_score:.1f} | {ev.overall_status} | "
- f"{dims.get('召回相关性', '-')} | {dims.get('参数匹配度', '-')} | "
- f"{dims.get('幻觉风险', '-')} | {dims.get('覆盖度', '-')} | "
- f"{dims.get('分数合理性', '-')} |"
- )
- lines.append("")
- # 详细结果
- lines.append("## 详细评估")
- lines.append("")
- for ev in evaluations:
- status_emoji = {"PASS": "✅", "WARN": "⚠️", "FAIL": "❌"}.get(ev.overall_status, "?")
- lines.append(f"### {status_emoji} {ev.chunk_id} (总分: {ev.overall_score:.1f})")
- lines.append("")
- lines.append(f"**分析**: {ev.analysis}")
- lines.append("")
- for dim in ev.dimensions:
- dim_emoji = {"PASS": "✅", "WARN": "⚠️", "FAIL": "❌"}.get(dim.status, "?")
- lines.append(f"- {dim_emoji} **{dim.name}**: {dim.score:.1f}/5.0")
- lines.append(f" - {dim.details}")
- lines.append("")
- # 全局统计
- total_pass = sum(1 for ev in evaluations if ev.overall_status == "PASS")
- total_warn = sum(1 for ev in evaluations if ev.overall_status == "WARN")
- total_fail = sum(1 for ev in evaluations if ev.overall_status == "FAIL")
- lines.append("## 全局统计")
- lines.append("")
- lines.append(f"- PASS: {total_pass}/{len(evaluations)}")
- lines.append(f"- WARN: {total_warn}/{len(evaluations)}")
- lines.append(f"- FAIL: {total_fail}/{len(evaluations)}")
- if evaluations:
- avg_overall = sum(ev.overall_score for ev in evaluations) / len(evaluations)
- lines.append(f"- 平均总分: {avg_overall:.1f}/5.0")
- return "\n".join(lines)
|