""" RAG 管线评估器 — 用 LLM + 规则对管线输出进行 5 维度自动评估 评估维度: 1. 召回相关性 (0-5): 检索到的规范条文是否与审查要点语义对齐 2. 参数匹配度 (0-5): 参数值(mm、MPa、℃)是否在规范条文中匹配 3. 幻觉风险 (0-5, 5=最安全): search_queries 是否捏造了原文不存在的概念 4. 覆盖度 (0-5): 原文关键技术参数是否被充分提取 5. 分数合理性: rerank_score 分布是否符合预期 """ import re import json import uuid from dataclasses import dataclass from typing import Optional from foundation.observability.logger.loggering import review_logger as logger # ========================= 评估结果结构 ========================= @dataclass class DimensionScore: """单维度评分""" name: str score: float # 0-5 max_score: float = 5.0 details: str = "" status: str = "PASS" # PASS / WARN / FAIL @property def percentage(self) -> float: return self.score / self.max_score * 100 @dataclass class SampleEvaluation: """单个测试样本的完整评估""" chunk_id: str dimensions: list = None # list[DimensionScore] overall_score: float = 0.0 overall_status: str = "PASS" analysis: str = "" def __post_init__(self): if self.dimensions is None: self.dimensions = [] def compute_overall(self): if self.dimensions: self.overall_score = sum(d.score for d in self.dimensions) / len(self.dimensions) if self.overall_score >= 3.5: self.overall_status = "PASS" elif self.overall_score >= 2.0: self.overall_status = "WARN" else: self.overall_status = "FAIL" # ========================= 评估器 ========================= class RAGEvaluator: """RAG 管线自动评估器""" def __init__(self): from foundation.ai.agent.generate.model_generate import generate_model_client self.generate_model_client = generate_model_client def _llm_evaluate(self, prompt: str, task_name: str = "rag_evaluation") -> str: """调用 LLM 进行评估""" try: task_prompt_info = { "task_prompt": prompt, "task_name": task_name } trace_id = str(uuid.uuid4()) response = self.generate_model_client.get_model_generate_invoke_sync( trace_id=trace_id, task_prompt_info=task_prompt_info, timeout=60, function_name="review_point_extract" # 复用现有模型配置 ) if isinstance(response, str): return response.strip() return str(response) except Exception as e: logger.error(f"[RAG评估] LLM 调用失败: {e}") return "" def _extract_parameter_tokens(self, text: str) -> set: """从文本中提取参数 token(数值+单位)""" patterns = [ r'\d+\.?\d*\s*mm', r'\d+\.?\d*\s*MPa', r'\d+\.?\d*\s*℃', r'\d+\.?\d*\s*%', r'\d+\.?\d*\s*m³', r'\d+\.?\d*\s*dB', r'≤\s*\d+\.?\d*', r'≥\s*\d+\.?\d*', r'±\s*\d+\.?\d*', r'H/\d+', r'L/\d+', r'F\.S', r'\d+\.?\d*\s*μm', r'\d+\.?\d*\s*με', ] tokens = set() for pattern in patterns: matches = re.findall(pattern, text, re.IGNORECASE) tokens.update(m.strip() for m in matches) return tokens # ========================= 维度 1: 召回相关性 ========================= def evaluate_recall(self, review_point: dict, results: list) -> DimensionScore: """评估召回相关性""" label = review_point.get('label', review_point.get('entity', '')) queries = review_point.get('search_queries', review_point.get('search_keywords', [])) if not results: return DimensionScore( name="召回相关性", score=0, details="无检索结果", status="FAIL" ) # 规则评估: 检查 top 结果的 rerank_score top_scores = [r.get('rerank_score', 0) or r.get('bfp_rerank_score', 0) for r in results[:3]] avg_top = sum(top_scores) / len(top_scores) if top_scores else 0 # 文本相关性: 检查检索文本中是否包含与 label 相关的关键词 label_keywords = set(label) # 中文字符级别 relevance_hits = 0 for r in results[:3]: text = r.get('text_content', '') # 检查 label 中的字在检索文本中的覆盖率 hit_count = sum(1 for ch in label_keywords if ch in text and ch.strip()) if len(label_keywords) > 0: coverage = hit_count / max(len(label_keywords), 1) if coverage > 0.3: relevance_hits += 1 # 综合评分 score = 0.0 if avg_top > 0.6: score += 2.5 elif avg_top > 0.4: score += 1.5 elif avg_top > 0.2: score += 0.5 score += min(relevance_hits, 3) * 0.8 # 最多 2.4 score = min(score, 5.0) status = "PASS" if score >= 3.0 else ("WARN" if score >= 1.5 else "FAIL") return DimensionScore( name="召回相关性", score=round(score, 1), details=f"avg_top_score={avg_top:.3f}, relevance_hits={relevance_hits}/3, top_scores={[f'{s:.3f}' for s in top_scores]}", status=status ) # ========================= 维度 2: 参数匹配度 ========================= def evaluate_parameter_match(self, review_point: dict, results: list) -> DimensionScore: """评估参数匹配度""" original_text = review_point.get('original_text', review_point.get('background', '')) parameter = review_point.get('parameter', '') if not results: return DimensionScore( name="参数匹配度", score=0, details="无检索结果", status="FAIL" ) # 提取原文中的参数 token source_text = original_text + " " + parameter source_params = self._extract_parameter_tokens(source_text) if not source_params: return DimensionScore( name="参数匹配度", score=3.0, details="原文无可提取参数,中性评分", status="PASS" ) # 检查检索结果中是否包含相同或相近的参数 retrieved_text = " ".join(r.get('text_content', '') for r in results[:5]) retrieved_params = self._extract_parameter_tokens(retrieved_text) # 精确匹配 exact_matches = source_params & retrieved_params match_ratio = len(exact_matches) / len(source_params) if source_params else 0 score = min(match_ratio * 5, 5.0) status = "PASS" if score >= 3.0 else ("WARN" if score >= 1.5 else "FAIL") return DimensionScore( name="参数匹配度", score=round(score, 1), details=f"原文参数: {source_params}, 匹配: {exact_matches}, 匹配率: {match_ratio:.0%}", status=status ) # ========================= 维度 3: 幻觉风险 ========================= def evaluate_hallucination(self, review_point: dict, content: str) -> DimensionScore: """评估 search_queries 是否忠实于原文(5=最安全)""" queries = review_point.get('search_queries', review_point.get('search_keywords', [])) original_text = review_point.get('original_text', review_point.get('background', '')) if not queries: return DimensionScore( name="幻觉风险", score=0, details="无 search_queries", status="FAIL" ) # 检查 original_text 是否确实在原文中 original_in_content = original_text in content if original_text else False # 检查 search_queries 中的关键词是否源自原文 query_text = " ".join(queries) query_keywords = set(re.findall(r'[一-鿿]{2,}', query_text)) content_keywords = set(re.findall(r'[一-鿿]{2,}', content)) # 计算 query 关键词在原文中的覆盖率 if query_keywords: coverage = len(query_keywords & content_keywords) / len(query_keywords) else: coverage = 0.5 # original_text 在原文中的加分 original_bonus = 1.5 if original_in_content else 0 score = min(coverage * 3.5 + original_bonus, 5.0) status = "PASS" if score >= 3.5 else ("WARN" if score >= 2.0 else "FAIL") return DimensionScore( name="幻觉风险", score=round(score, 1), details=( f"original_in_content={original_in_content}, " f"query_keyword_coverage={coverage:.0%}, " f"query_keywords_sample={list(query_keywords)[:5]}" ), status=status ) # ========================= 维度 4: 覆盖度 ========================= def evaluate_coverage(self, review_points: list, content: str) -> DimensionScore: """评估原文关键参数是否被充分提取为审查要点""" # 提取原文中的参数 content_params = self._extract_parameter_tokens(content) if not content_params: return DimensionScore( name="覆盖度", score=3.0, details="原文无可提取参数,中性评分", status="PASS" ) # 提取所有 review_points 中涉及的参数 rp_text = "" for rp in review_points: rp_text += " " + rp.get('original_text', rp.get('background', '')) rp_text += " " + rp.get('parameter', '') rp_text += " " + rp.get('label', rp.get('entity', '')) rp_params = self._extract_parameter_tokens(rp_text) # 计算覆盖率 covered = content_params & rp_params coverage = len(covered) / len(content_params) if content_params else 0 # 审查要点数量也影响评分(太少 = 覆盖不足) count_factor = min(len(review_points) / 3, 1.0) # 3个以上满分 score = min(coverage * 3 + count_factor * 2, 5.0) status = "PASS" if score >= 3.0 else ("WARN" if score >= 1.5 else "FAIL") return DimensionScore( name="覆盖度", score=round(score, 1), details=( f"原文参数数: {len(content_params)}, " f"被覆盖: {len(covered)}, " f"覆盖率: {coverage:.0%}, " f"审查要点数: {len(review_points)}" ), status=status ) # ========================= 维度 5: 分数合理性 ========================= def evaluate_score_distribution(self, retrieval_results: list) -> DimensionScore: """评估 rerank_score 分布是否合理""" all_scores = [] for results in retrieval_results: if results: for r in results: score = r.get('rerank_score', 0) or 0 if score > 0: all_scores.append(score) if not all_scores: return DimensionScore( name="分数合理性", score=1.0, details="无有效 rerank_score", status="WARN" ) avg_score = sum(all_scores) / len(all_scores) min_score = min(all_scores) max_score = max(all_scores) std_dev = (sum((s - avg_score) ** 2 for s in all_scores) / len(all_scores)) ** 0.5 # 合理性判断 issues = [] if avg_score < 0.2: issues.append("平均分过低(<0.2)") if avg_score > 0.9: issues.append("平均分过高(>0.9),可能缺乏区分度") if std_dev < 0.05: issues.append("标准差过小(<0.05),分数过于集中") if max_score - min_score < 0.1: issues.append("分数范围过窄") # 检查是否有极端低分 low_count = sum(1 for s in all_scores if s < 0.1) if low_count > len(all_scores) * 0.5: issues.append(f"超过50%的结果分数<0.1 ({low_count}/{len(all_scores)})") if not issues: score = 5.0 elif len(issues) == 1: score = 3.5 elif len(issues) == 2: score = 2.0 else: score = 1.0 status = "PASS" if score >= 3.0 else ("WARN" if score >= 2.0 else "FAIL") return DimensionScore( name="分数合理性", score=score, details=( f"avg={avg_score:.3f}, min={min_score:.3f}, max={max_score:.3f}, " f"std={std_dev:.3f}, n={len(all_scores)}, issues={issues}" ), status=status ) # ========================= 综合评估 ========================= def evaluate_sample(self, pipeline_result, content: str) -> SampleEvaluation: """ 对单个测试样本进行完整 5 维度评估 Args: pipeline_result: PipelineResult 对象 content: 原始文本 Returns: SampleEvaluation """ eval_result = SampleEvaluation(chunk_id=pipeline_result.chunk_id) if pipeline_result.extract_error or not pipeline_result.review_points: eval_result.dimensions = [ DimensionScore("召回相关性", 0, details="提取失败", status="FAIL"), DimensionScore("参数匹配度", 0, details="提取失败", status="FAIL"), DimensionScore("幻觉风险", 0, details="提取失败", status="FAIL"), DimensionScore("覆盖度", 0, details="提取失败", status="FAIL"), DimensionScore("分数合理性", 0, details="提取失败", status="FAIL"), ] eval_result.analysis = f"审查要点提取失败: {pipeline_result.extract_error}" eval_result.compute_overall() return eval_result # 维度 1+2: 对每个审查要点评估召回相关性和参数匹配度,取平均 recall_scores = [] param_scores = [] hallucination_scores = [] for i, rp in enumerate(pipeline_result.review_points): results = ( pipeline_result.retrieval_results[i] if pipeline_result.retrieval_results and i < len(pipeline_result.retrieval_results) else [] ) recall_scores.append(self.evaluate_recall(rp, results)) param_scores.append(self.evaluate_parameter_match(rp, results)) hallucination_scores.append(self.evaluate_hallucination(rp, content)) # 取各维度的平均分 avg_recall = sum(d.score for d in recall_scores) / len(recall_scores) if recall_scores else 0 avg_param = sum(d.score for d in param_scores) / len(param_scores) if param_scores else 0 avg_halluc = sum(d.score for d in hallucination_scores) / len(hallucination_scores) if hallucination_scores else 0 # 维度 4: 覆盖度 coverage = self.evaluate_coverage(pipeline_result.review_points, content) # 维度 5: 分数合理性 score_dist = self.evaluate_score_distribution(pipeline_result.retrieval_results) eval_result.dimensions = [ DimensionScore("召回相关性", round(avg_recall, 1), details=f"各要点: {[f'{d.score}' for d in recall_scores]}"), DimensionScore("参数匹配度", round(avg_param, 1), details=f"各要点: {[f'{d.score}' for d in param_scores]}"), DimensionScore("幻觉风险", round(avg_halluc, 1), details=f"各要点: {[f'{d.score}' for d in hallucination_scores]}"), coverage, score_dist, ] eval_result.compute_overall() # 生成分析文本 pass_count = sum(1 for d in eval_result.dimensions if d.status == "PASS") warn_count = sum(1 for d in eval_result.dimensions if d.status == "WARN") fail_count = sum(1 for d in eval_result.dimensions if d.status == "FAIL") eval_result.analysis = ( f"5维度评估: {pass_count} PASS / {warn_count} WARN / {fail_count} FAIL. " f"审查要点数={pipeline_result.review_point_count}, " f"检索结果数={pipeline_result.total_retrieved}" ) return eval_result # ========================= 报告生成 ========================= def generate_report(self, evaluations: list) -> str: """ 生成 Markdown 格式的测试报告 Args: evaluations: list[SampleEvaluation] Returns: str: Markdown 报告 """ lines = [ "# RAG 管线测试报告", "", f"测试样本数: {len(evaluations)}", "", ] # 汇总表 lines.append("## 汇总表") lines.append("") lines.append("| 样本 | 总分 | 状态 | 召回相关性 | 参数匹配度 | 幻觉风险 | 覆盖度 | 分数合理性 |") lines.append("|------|------|------|-----------|-----------|---------|--------|-----------|") for ev in evaluations: dims = {d.name: f"{d.score:.1f}" for d in ev.dimensions} lines.append( f"| {ev.chunk_id} | {ev.overall_score:.1f} | {ev.overall_status} | " f"{dims.get('召回相关性', '-')} | {dims.get('参数匹配度', '-')} | " f"{dims.get('幻觉风险', '-')} | {dims.get('覆盖度', '-')} | " f"{dims.get('分数合理性', '-')} |" ) lines.append("") # 详细结果 lines.append("## 详细评估") lines.append("") for ev in evaluations: status_emoji = {"PASS": "✅", "WARN": "⚠️", "FAIL": "❌"}.get(ev.overall_status, "?") lines.append(f"### {status_emoji} {ev.chunk_id} (总分: {ev.overall_score:.1f})") lines.append("") lines.append(f"**分析**: {ev.analysis}") lines.append("") for dim in ev.dimensions: dim_emoji = {"PASS": "✅", "WARN": "⚠️", "FAIL": "❌"}.get(dim.status, "?") lines.append(f"- {dim_emoji} **{dim.name}**: {dim.score:.1f}/5.0") lines.append(f" - {dim.details}") lines.append("") # 全局统计 total_pass = sum(1 for ev in evaluations if ev.overall_status == "PASS") total_warn = sum(1 for ev in evaluations if ev.overall_status == "WARN") total_fail = sum(1 for ev in evaluations if ev.overall_status == "FAIL") lines.append("## 全局统计") lines.append("") lines.append(f"- PASS: {total_pass}/{len(evaluations)}") lines.append(f"- WARN: {total_warn}/{len(evaluations)}") lines.append(f"- FAIL: {total_fail}/{len(evaluations)}") if evaluations: avg_overall = sum(ev.overall_score for ev in evaluations) / len(evaluations) lines.append(f"- 平均总分: {avg_overall:.1f}/5.0") return "\n".join(lines)