|
|
@@ -0,0 +1,341 @@
|
|
|
+#!/usr/bin/env python
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+词句语法审查 — Prompt 修复验证测试
|
|
|
+
|
|
|
+验证目标:修复 "将A改为A" 的离谱错误
|
|
|
+- 旧 prompt 包含否定示例(如"禁止输出将'设'改为'设'"),反而给模型植入了错误模式
|
|
|
+- 新 prompt 使用肯定式规则("犹豫时输出无明显问题")
|
|
|
+
|
|
|
+测试数据:temp/construction_review/final_result/67d45692fb97aeef8f896e78475ce539-1779781589.json
|
|
|
+其中 chunk[8] 包含触发 bug 的原文:"必须采取充分的安全保证措施"
|
|
|
+
|
|
|
+运行方式:
|
|
|
+ $env:PYTHONPATH = (Get-Location)
|
|
|
+ pytest utils_test/Grammar_Check_Test/test_grammar_check_prompt_fix.py -v -s
|
|
|
+"""
|
|
|
+
|
|
|
+import sys
|
|
|
+import os
|
|
|
+import json
|
|
|
+import re
|
|
|
+import time
|
|
|
+import asyncio
|
|
|
+from pathlib import Path
|
|
|
+
|
|
|
+# 项目根目录注入
|
|
|
+PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))
|
|
|
+if PROJECT_ROOT not in sys.path:
|
|
|
+ sys.path.insert(0, PROJECT_ROOT)
|
|
|
+
|
|
|
+import pytest
|
|
|
+
|
|
|
+# ============================================================
|
|
|
+# 测试数据
|
|
|
+# ============================================================
|
|
|
+RESULT_JSON = os.path.join(
|
|
|
+ PROJECT_ROOT,
|
|
|
+ "temp", "construction_review", "final_result",
|
|
|
+ "67d45692fb97aeef8f896e78475ce539-1779781589.json"
|
|
|
+)
|
|
|
+
|
|
|
+
|
|
|
+def _load_chunks():
|
|
|
+ """加载文档 chunks"""
|
|
|
+ with open(RESULT_JSON, 'r', encoding='utf-8') as f:
|
|
|
+ data = json.load(f)
|
|
|
+ return data['document_result']['structured_content']['chunks']
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================
|
|
|
+# Bug 检测工具函数
|
|
|
+# ============================================================
|
|
|
+def detect_a_to_a_pattern(response_text: str) -> list:
|
|
|
+ """
|
|
|
+ 检测模型输出中是否包含 "将X改为X" 模式(X相同)
|
|
|
+
|
|
|
+ 匹配模式:
|
|
|
+ - 将"充分"改为"充分"
|
|
|
+ - 将'设'改为'设'
|
|
|
+ - 把"X"修改为"X"
|
|
|
+ - 建议将X改为X
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list: 匹配到的问题片段列表
|
|
|
+ """
|
|
|
+ if not response_text:
|
|
|
+ return []
|
|
|
+
|
|
|
+ issues = []
|
|
|
+
|
|
|
+ # 模式1: 将"X"改为"X" / 将'X'改为'X' / 把"X"改为"X"
|
|
|
+ pattern_quoted = re.compile(
|
|
|
+ r'(?:将|把)["“\'](.{1,10})["”\']\s*(?:改为|修改为|替换为|换成)\s*["“\'](.{1,10})["”\']'
|
|
|
+ )
|
|
|
+ for m in pattern_quoted.finditer(response_text):
|
|
|
+ original, replacement = m.group(1).strip(), m.group(2).strip()
|
|
|
+ if original == replacement:
|
|
|
+ issues.append(m.group(0))
|
|
|
+
|
|
|
+ # 模式2: suggestion 字段过长且包含自我辩论关键词
|
|
|
+ debate_keywords = ['然而', '再细看', '重新审视', '其实', '但', '不过', '似乎', '略显生硬']
|
|
|
+ debate_count = sum(1 for kw in debate_keywords if kw in response_text)
|
|
|
+ if debate_count >= 3:
|
|
|
+ issues.append(f"[自我辩论] 响应中包含 {debate_count} 个犹豫/反驳关键词: "
|
|
|
+ f"{[kw for kw in debate_keywords if kw in response_text]}")
|
|
|
+
|
|
|
+ return issues
|
|
|
+
|
|
|
+
|
|
|
+def parse_json_from_response(response_text: str) -> list:
|
|
|
+ """从模型响应中提取 JSON 结果"""
|
|
|
+ if not response_text:
|
|
|
+ return []
|
|
|
+
|
|
|
+ # 先尝试直接解析
|
|
|
+ try:
|
|
|
+ data = json.loads(response_text)
|
|
|
+ if isinstance(data, list):
|
|
|
+ return data
|
|
|
+ elif isinstance(data, dict):
|
|
|
+ return [data]
|
|
|
+ except (json.JSONDecodeError, TypeError):
|
|
|
+ pass
|
|
|
+
|
|
|
+ # 尝试从 markdown 代码块中提取
|
|
|
+ json_blocks = re.findall(r'```(?:json)?\s*\n?(.*?)\n?```', response_text, re.DOTALL)
|
|
|
+ for block in json_blocks:
|
|
|
+ try:
|
|
|
+ data = json.loads(block.strip())
|
|
|
+ if isinstance(data, list):
|
|
|
+ return data
|
|
|
+ elif isinstance(data, dict):
|
|
|
+ return [data]
|
|
|
+ except (json.JSONDecodeError, TypeError):
|
|
|
+ continue
|
|
|
+
|
|
|
+ # 尝试找到第一个 [ 或 { 开始解析
|
|
|
+ for start_char, end_char in [('[', ']'), ('{', '}')]:
|
|
|
+ start = response_text.find(start_char)
|
|
|
+ if start >= 0:
|
|
|
+ # 从后往前找匹配的结束符
|
|
|
+ for end in range(len(response_text) - 1, start, -1):
|
|
|
+ if response_text[end] == end_char:
|
|
|
+ try:
|
|
|
+ data = json.loads(response_text[start:end + 1])
|
|
|
+ if isinstance(data, list):
|
|
|
+ return data
|
|
|
+ elif isinstance(data, dict):
|
|
|
+ return [data]
|
|
|
+ except (json.JSONDecodeError, TypeError):
|
|
|
+ continue
|
|
|
+
|
|
|
+ return []
|
|
|
+
|
|
|
+
|
|
|
+# ============================================================
|
|
|
+# 测试类
|
|
|
+# ============================================================
|
|
|
+class TestGrammarCheckPromptFix:
|
|
|
+ """词句语法审查 Prompt 修复验证"""
|
|
|
+
|
|
|
+ @pytest.fixture(autouse=True)
|
|
|
+ def setup(self):
|
|
|
+ """初始化"""
|
|
|
+ self.chunks = _load_chunks()
|
|
|
+ # bug 复现的 chunk: [8] 包含 "采取充分的安全保证措施"
|
|
|
+ self.bug_chunk = self.chunks[8]
|
|
|
+ assert '充分' in self.bug_chunk['content'], "chunk[8] 应包含 '充分' 文本"
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_bug_chunk_no_a_to_a(self):
|
|
|
+ """
|
|
|
+ 【核心测试】原 bug chunk 不再产生 "将A改为A" 的结果
|
|
|
+
|
|
|
+ 这是触发原始 bug 的具体文本(chunk[8]: 施工要求和技术保证条件),
|
|
|
+ 模型曾对"充分"一词产生自我辩论,输出"将'充分'改为'充分'"。
|
|
|
+ """
|
|
|
+ from core.construction_review.component.reviewers.grammar_check_reviewer import GrammarCheckReviewer
|
|
|
+
|
|
|
+ reviewer = GrammarCheckReviewer()
|
|
|
+ trace_id = f"grammar_fix_test_bug_{int(time.time())}"
|
|
|
+
|
|
|
+ print(f"\n{'='*70}")
|
|
|
+ print(f" 测试原 bug chunk: {self.bug_chunk['section_label']}")
|
|
|
+ print(f" 内容长度: {len(self.bug_chunk['content'])} 字符")
|
|
|
+ print(f"{'='*70}")
|
|
|
+
|
|
|
+ start = time.time()
|
|
|
+ result = await reviewer.check_grammar(
|
|
|
+ trace_id=trace_id,
|
|
|
+ review_content=self.bug_chunk['content'],
|
|
|
+ state=None,
|
|
|
+ stage_name=None,
|
|
|
+ enable_thinking=False,
|
|
|
+ )
|
|
|
+ wall_time = time.time() - start
|
|
|
+
|
|
|
+ print(f"\n 审查耗时: {wall_time:.2f}s")
|
|
|
+ print(f" success: {result.success}")
|
|
|
+
|
|
|
+ response_text = result.details.get('response', '')
|
|
|
+ print(f" 响应长度: {len(response_text)} 字符")
|
|
|
+
|
|
|
+ # 判断是否输出"无明显问题"
|
|
|
+ is_no_issue = '无明显问题' in response_text and len(response_text) < 50
|
|
|
+ print(f" 是否无明显问题: {is_no_issue}")
|
|
|
+
|
|
|
+ if not is_no_issue:
|
|
|
+ # 解析 JSON 结果
|
|
|
+ issues = parse_json_from_response(response_text)
|
|
|
+ print(f" 发现 {len(issues)} 个问题")
|
|
|
+ for idx, issue in enumerate(issues):
|
|
|
+ print(f"\n --- 问题 {idx + 1} ---")
|
|
|
+ print(f" issue_point: {issue.get('issue_point', 'N/A')}")
|
|
|
+ print(f" location: {issue.get('location', 'N/A')[:80]}...")
|
|
|
+ print(f" suggestion: {issue.get('suggestion', 'N/A')[:120]}")
|
|
|
+ print(f" reason: {issue.get('reason', 'N/A')[:120]}")
|
|
|
+ print(f" risk_level: {issue.get('risk_level', 'N/A')}")
|
|
|
+
|
|
|
+ # 打印原始响应供人工检查
|
|
|
+ print(f"\n --- 原始响应 ---")
|
|
|
+ print(response_text[:2000])
|
|
|
+ else:
|
|
|
+ print(f" 原始响应: {response_text}")
|
|
|
+
|
|
|
+ # ===== 断言 =====
|
|
|
+ assert result.success, f"审查应成功,实际错误: {result.error_message}"
|
|
|
+
|
|
|
+ # 核心断言:不应出现 "将A改为A" 模式
|
|
|
+ a_to_a_issues = detect_a_to_a_pattern(response_text)
|
|
|
+ assert not a_to_a_issues, (
|
|
|
+ f"检测到 '将A改为A' 模式仍存在!\n"
|
|
|
+ f"问题片段: {a_to_a_issues}\n"
|
|
|
+ f"完整响应:\n{response_text}"
|
|
|
+ )
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_multiple_overview_chunks(self):
|
|
|
+ """
|
|
|
+ 【扩展测试】多个 overview chunk 均不产生 "将A改为A" 结果
|
|
|
+
|
|
|
+ 测试所有 overview 章节的 chunk,确保修复具有泛化性。
|
|
|
+ """
|
|
|
+ from core.construction_review.component.reviewers.grammar_check_reviewer import GrammarCheckReviewer
|
|
|
+
|
|
|
+ reviewer = GrammarCheckReviewer()
|
|
|
+
|
|
|
+ # 筛选 overview chunks
|
|
|
+ overview_chunks = [
|
|
|
+ c for c in self.chunks
|
|
|
+ if c.get('chapter_classification') == 'overview'
|
|
|
+ ]
|
|
|
+ print(f"\n{'='*70}")
|
|
|
+ print(f" 扩展测试: {len(overview_chunks)} 个 overview chunks")
|
|
|
+ print(f"{'='*70}")
|
|
|
+
|
|
|
+ all_a_to_a_issues = []
|
|
|
+
|
|
|
+ for idx, chunk in enumerate(overview_chunks):
|
|
|
+ trace_id = f"grammar_fix_test_overview_{idx}_{int(time.time())}"
|
|
|
+ section = chunk.get('section_label', f'chunk_{idx}')
|
|
|
+ content = chunk['content']
|
|
|
+
|
|
|
+ print(f"\n [{idx}] {section} (len={len(content)})")
|
|
|
+
|
|
|
+ start = time.time()
|
|
|
+ result = await reviewer.check_grammar(
|
|
|
+ trace_id=trace_id,
|
|
|
+ review_content=content,
|
|
|
+ state=None,
|
|
|
+ stage_name=None,
|
|
|
+ enable_thinking=False,
|
|
|
+ )
|
|
|
+ wall_time = time.time() - start
|
|
|
+
|
|
|
+ response_text = result.details.get('response', '')
|
|
|
+ is_no_issue = '无明显问题' in response_text and len(response_text) < 50
|
|
|
+
|
|
|
+ # 检测 A→A 模式
|
|
|
+ a_to_a = detect_a_to_a_pattern(response_text)
|
|
|
+ status = "[OK] 无明显问题" if is_no_issue else (
|
|
|
+ f"[!!] 有 {len(parse_json_from_response(response_text))} 个问题"
|
|
|
+ )
|
|
|
+ if a_to_a:
|
|
|
+ status += f" [FAIL] 检测到A->A模式: {a_to_a}"
|
|
|
+ all_a_to_a_issues.extend([(section, issue) for issue in a_to_a])
|
|
|
+
|
|
|
+ print(f" 耗时: {wall_time:.2f}s | {status}")
|
|
|
+
|
|
|
+ if not is_no_issue and not a_to_a:
|
|
|
+ # 打印发现的问题摘要
|
|
|
+ issues = parse_json_from_response(response_text)
|
|
|
+ for issue in issues:
|
|
|
+ ip = issue.get('issue_point', '')[:60]
|
|
|
+ sg = issue.get('suggestion', '')[:80]
|
|
|
+ print(f" -> {ip} | 建议: {sg}")
|
|
|
+
|
|
|
+ print(f"\n{'='*70}")
|
|
|
+ print(f" 扩展测试完成: {len(overview_chunks)} 个 chunks")
|
|
|
+ print(f" A->A 问题数: {len(all_a_to_a_issues)}")
|
|
|
+ print(f"{'='*70}")
|
|
|
+
|
|
|
+ # 核心断言
|
|
|
+ assert not all_a_to_a_issues, (
|
|
|
+ f"检测到 {len(all_a_to_a_issues)} 个 '将A改为A' 模式!\n"
|
|
|
+ + "\n".join(f" {sec}: {issue}" for sec, issue in all_a_to_a_issues)
|
|
|
+ )
|
|
|
+
|
|
|
+ @pytest.mark.asyncio
|
|
|
+ @pytest.mark.integration
|
|
|
+ async def test_suggestion_field_concise(self):
|
|
|
+ """
|
|
|
+ 【格式验证】suggestion 字段应简洁,不包含推理过程
|
|
|
+
|
|
|
+ 新 prompt 要求 suggestion 只写最终结论,禁止自我辩论。
|
|
|
+ """
|
|
|
+ from core.construction_review.component.reviewers.grammar_check_reviewer import GrammarCheckReviewer
|
|
|
+
|
|
|
+ reviewer = GrammarCheckReviewer()
|
|
|
+ trace_id = f"grammar_fix_test_concise_{int(time.time())}"
|
|
|
+
|
|
|
+ # 使用 bug chunk
|
|
|
+ result = await reviewer.check_grammar(
|
|
|
+ trace_id=trace_id,
|
|
|
+ review_content=self.bug_chunk['content'],
|
|
|
+ state=None,
|
|
|
+ stage_name=None,
|
|
|
+ enable_thinking=False,
|
|
|
+ )
|
|
|
+
|
|
|
+ response_text = result.details.get('response', '')
|
|
|
+ issues = parse_json_from_response(response_text)
|
|
|
+
|
|
|
+ if not issues:
|
|
|
+ print("\n 模型输出'无明显问题',无需验证 suggestion 格式")
|
|
|
+ return
|
|
|
+
|
|
|
+ print(f"\n 发现 {len(issues)} 个问题,验证 suggestion 格式:")
|
|
|
+
|
|
|
+ for idx, issue in enumerate(issues):
|
|
|
+ suggestion = issue.get('suggestion', '')
|
|
|
+ reason = issue.get('reason', '')
|
|
|
+ print(f"\n --- 问题 {idx + 1} ---")
|
|
|
+ print(f" suggestion ({len(suggestion)}字): {suggestion[:150]}")
|
|
|
+ print(f" reason ({len(reason)}字): {reason[:150]}")
|
|
|
+
|
|
|
+ # suggestion 不应包含推理/辩论关键词
|
|
|
+ debate_keywords = ['然而', '再细看', '重新审视', '让我们', '再审视']
|
|
|
+ found_debate = [kw for kw in debate_keywords if kw in suggestion]
|
|
|
+ assert not found_debate, (
|
|
|
+ f"suggestion 字段包含推理过程!\n"
|
|
|
+ f"检测到辩论关键词: {found_debate}\n"
|
|
|
+ f"suggestion 内容: {suggestion}"
|
|
|
+ )
|
|
|
+
|
|
|
+ # suggestion 不应过长(超过 200 字大概率包含推理)
|
|
|
+ assert len(suggestion) < 200, (
|
|
|
+ f"suggestion 字段过长({len(suggestion)}字),可能包含推理过程:\n{suggestion}"
|
|
|
+ )
|